aoc/util/
thread.rs

1//! Utility methods to spawn a number of
2//! [scoped](https://doc.rust-lang.org/stable/std/thread/fn.scope.html)
3//! threads equals to the number of cores on the machine. Unlike normal threads, scoped threads
4//! can borrow data from their environment.
5use std::sync::atomic::{AtomicUsize, Ordering::Relaxed};
6use std::thread::*;
7
8// Usually the number of physical cores.
9fn threads() -> usize {
10    available_parallelism().unwrap().get()
11}
12
13/// Spawn `n` scoped threads, where `n` is the available parallelism.
14pub fn spawn<F>(f: F)
15where
16    F: Fn() + Copy + Send,
17{
18    scope(|scope| {
19        for _ in 0..threads() {
20            scope.spawn(f);
21        }
22    });
23}
24
25/// Spawns `n` scoped threads that each receive a
26/// [work stealing](https://en.wikipedia.org/wiki/Work_stealing) iterator.
27/// Work stealing is an efficient strategy that keeps each CPU core busy when some items take longer
28/// than other to process, used by popular libraries such as [rayon](https://github.com/rayon-rs/rayon).
29/// Processing at different rates also happens on many modern CPUs with
30/// [heterogeneous performance and efficiency cores](https://en.wikipedia.org/wiki/ARM_big.LITTLE).
31pub fn spawn_parallel_iterator<F, T>(items: &[T], f: F)
32where
33    F: Fn(ParIter<'_, T>) + Copy + Send,
34    T: Sync,
35{
36    let threads = threads();
37    let size = items.len().div_ceil(threads);
38
39    // Initially divide work as evenly as possible amongst each worker thread.
40    let workers: Vec<_> = (0..threads)
41        .map(|id| {
42            let start = (id * size).min(items.len());
43            let end = (start + size).min(items.len());
44            CachePadding::new(pack(start, end))
45        })
46        .collect();
47    let workers = workers.as_slice();
48
49    scope(|scope| {
50        for id in 0..threads {
51            scope.spawn(move || f(ParIter { id, items, workers }));
52        }
53    });
54}
55
56pub struct ParIter<'a, T> {
57    id: usize,
58    items: &'a [T],
59    workers: &'a [CachePadding],
60}
61
62impl<'a, T> Iterator for ParIter<'a, T> {
63    type Item = &'a T;
64
65    fn next(&mut self) -> Option<&'a T> {
66        // First try taking from our own queue.
67        let worker = &self.workers[self.id];
68        let current = worker.increment();
69        let (start, end) = unpack(current);
70
71        // There's still items to process.
72        if start < end {
73            return Some(&self.items[start]);
74        }
75
76        // Steal from another worker, [spinlocking](https://en.wikipedia.org/wiki/Spinlock)
77        // until we acquire new items to process or there's nothing left to do.
78        loop {
79            // Find worker with the most remaining items.
80            let available = self
81                .workers
82                .iter()
83                .filter_map(|other| {
84                    let current = other.load();
85                    let (start, end) = unpack(current);
86                    let size = end.saturating_sub(start);
87
88                    (size > 0).then_some((other, current, size))
89                })
90                .max_by_key(|t| t.2);
91
92            if let Some((other, current, size)) = available {
93                // Split the work items into two roughly equal piles.
94                let (start, end) = unpack(current);
95                let middle = start + size.div_ceil(2);
96
97                let next = pack(middle, end);
98                let stolen = pack(start + 1, middle);
99
100                // We could be preempted by another thread stealing or by the owning worker
101                // thread finishing an item, so check indices are still unmodified.
102                if other.compare_exchange(current, next) {
103                    worker.store(stolen);
104                    break Some(&self.items[start]);
105                }
106            } else {
107                // No work remaining.
108                break None;
109            }
110        }
111    }
112}
113
114/// Intentionally force alignment to 128 bytes to make a best effort attempt to place each atomic
115/// on its own cache line. This reduces contention and improves performance for common
116/// CPU caching protocols such as [MESI](https://en.wikipedia.org/wiki/MESI_protocol).
117#[repr(align(128))]
118pub struct CachePadding {
119    atomic: AtomicUsize,
120}
121
122/// Convenience wrapper methods around atomic operations. Both start and end indices are packed
123/// into a single atomic so that we can use the fastest and easiest to reason about `Relaxed`
124/// ordering.
125impl CachePadding {
126    #[inline]
127    fn new(n: usize) -> Self {
128        CachePadding { atomic: AtomicUsize::new(n) }
129    }
130
131    #[inline]
132    fn increment(&self) -> usize {
133        self.atomic.fetch_add(1, Relaxed)
134    }
135
136    #[inline]
137    fn load(&self) -> usize {
138        self.atomic.load(Relaxed)
139    }
140
141    #[inline]
142    fn store(&self, n: usize) {
143        self.atomic.store(n, Relaxed);
144    }
145
146    #[inline]
147    fn compare_exchange(&self, current: usize, new: usize) -> bool {
148        self.atomic.compare_exchange(current, new, Relaxed, Relaxed).is_ok()
149    }
150}
151
152#[inline]
153fn pack(start: usize, end: usize) -> usize {
154    (end << 32) | start
155}
156
157#[inline]
158fn unpack(both: usize) -> (usize, usize) {
159    (both & 0xffffffff, both >> 32)
160}