1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
//! A linked list of debt nodes.
//!
//! A node may or may not be owned by a thread. Reader debts are allocated in its owned node,
//! writer walks everything (but may also use some owned values).
//!
//! The list is prepend-only ‒ if thread dies, the node lives on (and can be claimed by another
//! thread later on). This makes the implementation much simpler, since everything here is
//! `'static` and we don't have to care about knowing when to free stuff.
//!
//! The nodes contain both the fast primary slots and a secondary fallback ones.
//!
//! # Synchronization
//!
//! We synchronize several things here.
//!
//! The addition of nodes is synchronized through the head (Load on each read, AcqReal on each
//! attempt to add another node). Note that certain parts never change after that (they aren't even
//! atomic) and other things that do change take care of themselves (the debt slots have their own
//! synchronization, etc).
//!
//! The ownership is acquire-release lock pattern.
//!
//! Similar, the counting of active writers is an acquire-release lock pattern.
//!
//! We also do release-acquire "send" from the start-cooldown to check-cooldown to make sure we see
//! at least as up to date value of the writers as when the cooldown started. That we if we see 0,
//! we know it must have happened since then.

use std::cell::Cell;
use std::ptr;
use std::slice::Iter;
use std::sync::atomic::Ordering::*;
use std::sync::atomic::{AtomicPtr, AtomicUsize};

use super::fast::{Local as FastLocal, Slots as FastSlots};
use super::helping::{Local as HelpingLocal, Slots as HelpingSlots};
use super::Debt;
use crate::RefCnt;

const NODE_UNUSED: usize = 0;
const NODE_USED: usize = 1;
const NODE_COOLDOWN: usize = 2;

/// The head of the debt linked list.
static LIST_HEAD: AtomicPtr<Node> = AtomicPtr::new(ptr::null_mut());

pub struct NodeReservation<'a>(&'a Node);

impl Drop for NodeReservation<'_> {
    fn drop(&mut self) {
        self.0.active_writers.fetch_sub(1, Release);
    }
}

/// One thread-local node for debts.
#[repr(C, align(64))]
pub(crate) struct Node {
    fast: FastSlots,
    helping: HelpingSlots,
    in_use: AtomicUsize,
    // Next node in the list.
    //
    // It is a pointer because we touch it before synchronization (we don't _dereference_ it before
    // synchronization, only manipulate the pointer itself). That is illegal according to strict
    // interpretation of the rules by MIRI on references.
    next: *const Node,
    active_writers: AtomicUsize,
}

impl Default for Node {
    fn default() -> Self {
        Node {
            fast: FastSlots::default(),
            helping: HelpingSlots::default(),
            in_use: AtomicUsize::new(NODE_USED),
            next: ptr::null(),
            active_writers: AtomicUsize::new(0),
        }
    }
}

impl Node {
    /// Goes through the debt linked list.
    ///
    /// This traverses the linked list, calling the closure on each node. If the closure returns
    /// `Some`, it terminates with that value early, otherwise it runs to the end.
    pub(crate) fn traverse<R, F: FnMut(&'static Node) -> Option<R>>(mut f: F) -> Option<R> {
        // Acquire ‒ we want to make sure we read the correct version of data at the end of the
        // pointer. Any write to the DEBT_HEAD is with Release.
        //
        // Furthermore, we need to see the newest version of the list in case we examine the debts
        // - if a new one is added recently, we don't want a stale read -> SeqCst.
        //
        // Note that the other pointers in the chain never change and are *ordinary* pointers. The
        // whole linked list is synchronized through the head.
        let mut current = unsafe { LIST_HEAD.load(SeqCst).as_ref() };
        while let Some(node) = current {
            let result = f(node);
            if result.is_some() {
                return result;
            }
            current = unsafe { node.next.as_ref() };
        }
        None
    }

    /// Put the current thread node into cooldown
    fn start_cooldown(&self) {
        // Trick: Make sure we have an up to date value of the active_writers in this thread, so we
        // can properly release it below.
        let _reservation = self.reserve_writer();
        assert_eq!(NODE_USED, self.in_use.swap(NODE_COOLDOWN, Release));
    }

    /// Perform a cooldown if the node is ready.
    ///
    /// See the ABA protection at the [helping].
    fn check_cooldown(&self) {
        // Check if the node is in cooldown, for two reasons:
        // * Skip most of nodes fast, without dealing with them.
        // * More importantly, sync the value of active_writers to be at least the value when the
        //   cooldown started. That way we know the 0 we observe happened some time after
        //   start_cooldown.
        if self.in_use.load(Acquire) == NODE_COOLDOWN {
            // The rest can be nicely relaxed ‒ no memory is being synchronized by these
            // operations. We just see an up to date 0 and allow someone (possibly us) to claim the
            // node later on.
            if self.active_writers.load(Relaxed) == 0 {
                let _ = self
                    .in_use
                    .compare_exchange(NODE_COOLDOWN, NODE_UNUSED, Relaxed, Relaxed);
            }
        }
    }

    /// Mark this node that a writer is currently playing with it.
    pub fn reserve_writer(&self) -> NodeReservation {
        self.active_writers.fetch_add(1, Acquire);
        NodeReservation(self)
    }

    /// "Allocate" a node.
    ///
    /// Either a new one is created, or previous one is reused. The node is claimed to become
    /// in_use.
    fn get() -> &'static Self {
        // Try to find an unused one in the chain and reuse it.
        Self::traverse(|node| {
            node.check_cooldown();
            if node
                .in_use
                // We claim a unique control over the generation and the right to write to slots if
                // they are NO_DEPT
                .compare_exchange(NODE_UNUSED, NODE_USED, SeqCst, Relaxed)
                .is_ok()
            {
                Some(node)
            } else {
                None
            }
        })
        // If that didn't work, create a new one and prepend to the list.
        .unwrap_or_else(|| {
            let node = Box::leak(Box::<Node>::default());
            node.helping.init();
            // We don't want to read any data in addition to the head, Relaxed is fine
            // here.
            //
            // We do need to release the data to others, but for that, we acquire in the
            // compare_exchange below.
            let mut head = LIST_HEAD.load(Relaxed);
            loop {
                node.next = head;
                if let Err(old) = LIST_HEAD.compare_exchange_weak(
                    head, node,
                    // We need to release *the whole chain* here. For that, we need to
                    // acquire it first.
                    //
                    // SeqCst because we need to make sure it is properly set "before" we do
                    // anything to the debts.
                    SeqCst, Relaxed, // Nothing changed, go next round of the loop.
                ) {
                    head = old;
                } else {
                    return node;
                }
            }
        })
    }

    /// Iterate over the fast slots.
    pub(crate) fn fast_slots(&self) -> Iter<Debt> {
        self.fast.into_iter()
    }

    /// Access the helping slot.
    pub(crate) fn helping_slot(&self) -> &Debt {
        self.helping.slot()
    }
}

/// A wrapper around a node pointer, to un-claim the node on thread shutdown.
pub(crate) struct LocalNode {
    /// Node for this thread, if any.
    ///
    /// We don't necessarily have to own one, but if we don't, we'll get one before the first use.
    node: Cell<Option<&'static Node>>,

    /// Thread-local data for the fast slots.
    fast: FastLocal,

    /// Thread local data for the helping strategy.
    helping: HelpingLocal,
}

impl LocalNode {
    pub(crate) fn with<R, F: FnOnce(&LocalNode) -> R>(f: F) -> R {
        let f = Cell::new(Some(f));
        THREAD_HEAD
            .try_with(|head| {
                if head.node.get().is_none() {
                    head.node.set(Some(Node::get()));
                }
                let f = f.take().unwrap();
                f(head)
            })
            // During the application shutdown, the thread local storage may be already
            // deallocated. In that case, the above fails but we still need something. So we just
            // find or allocate a node and use it just once.
            //
            // Note that the situation should be very very rare and not happen often, so the slower
            // performance doesn't matter that much.
            .unwrap_or_else(|_| {
                let tmp_node = LocalNode {
                    node: Cell::new(Some(Node::get())),
                    fast: FastLocal::default(),
                    helping: HelpingLocal::default(),
                };
                let f = f.take().unwrap();
                f(&tmp_node)
                // Drop of tmp_node -> sends the node we just used into cooldown.
            })
    }

    /// Creates a new debt.
    ///
    /// This stores the debt of the given pointer (untyped, casted into an usize) and returns a
    /// reference to that slot, or gives up with `None` if all the slots are currently full.
    #[inline]
    pub(crate) fn new_fast(&self, ptr: usize) -> Option<&'static Debt> {
        let node = &self.node.get().expect("LocalNode::with ensures it is set");
        debug_assert_eq!(node.in_use.load(Relaxed), NODE_USED);
        node.fast.get_debt(ptr, &self.fast)
    }

    /// Initializes a helping slot transaction.
    ///
    /// Returns the generation (with tag).
    pub(crate) fn new_helping(&self, ptr: usize) -> usize {
        let node = &self.node.get().expect("LocalNode::with ensures it is set");
        debug_assert_eq!(node.in_use.load(Relaxed), NODE_USED);
        let (gen, discard) = node.helping.get_debt(ptr, &self.helping);
        if discard {
            // Too many generations happened, make sure the writers give the poor node a break for
            // a while so they don't observe the generation wrapping around.
            node.start_cooldown();
            self.node.take();
        }
        gen
    }

    /// Confirm the helping transaction.
    ///
    /// The generation comes from previous new_helping.
    ///
    /// Will either return a debt with the pointer, or a debt to pay and a replacement (already
    /// protected) address.
    pub(crate) fn confirm_helping(
        &self,
        gen: usize,
        ptr: usize,
    ) -> Result<&'static Debt, (&'static Debt, usize)> {
        let node = &self.node.get().expect("LocalNode::with ensures it is set");
        debug_assert_eq!(node.in_use.load(Relaxed), NODE_USED);
        let slot = node.helping_slot();
        node.helping
            .confirm(gen, ptr)
            .map(|()| slot)
            .map_err(|repl| (slot, repl))
    }

    /// The writer side of a helping slot.
    ///
    /// This potentially helps the `who` node (uses self as the local node, which must be
    /// different) by loading the address that one is trying to load.
    pub(super) fn help<R, T>(&self, who: &Node, storage_addr: usize, replacement: &R)
    where
        T: RefCnt,
        R: Fn() -> T,
    {
        let node = &self.node.get().expect("LocalNode::with ensures it is set");
        debug_assert_eq!(node.in_use.load(Relaxed), NODE_USED);
        node.helping.help(&who.helping, storage_addr, replacement)
    }
}

impl Drop for LocalNode {
    fn drop(&mut self) {
        if let Some(node) = self.node.get() {
            // Release - syncing writes/ownership of this Node
            node.start_cooldown();
        }
    }
}

thread_local! {
    /// A debt node assigned to this thread.
    static THREAD_HEAD: LocalNode = LocalNode {
        node: Cell::new(None),
        fast: FastLocal::default(),
        helping: HelpingLocal::default(),
    };
}

#[cfg(test)]
mod tests {
    use super::*;

    impl Node {
        fn is_empty(&self) -> bool {
            self.fast_slots()
                .chain(std::iter::once(self.helping_slot()))
                .all(|d| d.0.load(Relaxed) == Debt::NONE)
        }

        fn get_thread() -> &'static Self {
            LocalNode::with(|h| h.node.get().unwrap())
        }
    }

    /// A freshly acquired thread local node is empty.
    #[test]
    fn new_empty() {
        assert!(Node::get_thread().is_empty());
    }
}