aoc/util/
thread.rs

1//! Utility methods to spawn a number of
2//! [scoped](https://doc.rust-lang.org/stable/std/thread/fn.scope.html)
3//! threads equal to the number of cores on the machine. Unlike normal threads, scoped threads
4//! can borrow data from their environment.
5use std::sync::atomic::{AtomicBool, AtomicU32, AtomicUsize, Ordering::Relaxed};
6use std::thread::*;
7
8/// Usually the number of physical cores.
9pub fn threads() -> usize {
10    available_parallelism().unwrap().get()
11}
12
13/// Spawn `n` scoped threads, where `n` is the available parallelism.
14pub fn spawn<F, R>(f: F) -> Vec<R>
15where
16    F: Fn() -> R + Copy + Send,
17    R: Send,
18{
19    scope(|scope| {
20        let mut handles = Vec::new();
21
22        for _ in 0..threads() {
23            let handle = scope.spawn(f);
24            handles.push(handle);
25        }
26
27        handles.into_iter().flat_map(ScopedJoinHandle::join).collect()
28    })
29}
30
31/// Spawns `n` scoped threads that each receive a
32/// [work stealing](https://en.wikipedia.org/wiki/Work_stealing) iterator.
33/// Work stealing is an efficient strategy that keeps each CPU core busy when some items take longer
34/// than others to process, used by popular libraries such as [rayon](https://github.com/rayon-rs/rayon).
35/// Processing at different rates also happens on many modern CPUs with
36/// [heterogeneous performance and efficiency cores](https://en.wikipedia.org/wiki/ARM_big.LITTLE).
37pub fn spawn_parallel_iterator<F, R, T>(items: &[T], f: F) -> Vec<R>
38where
39    F: Fn(ParIter<'_, T>) -> R + Copy + Send,
40    R: Send,
41    T: Sync,
42{
43    let threads = threads();
44    let size = items.len().div_ceil(threads);
45
46    // Initially divide work as evenly as possible among each worker thread.
47    let workers: Vec<_> = (0..threads)
48        .map(|id| {
49            let start = (id * size).min(items.len());
50            let end = (start + size).min(items.len());
51            CachePadding::new(pack(start, end))
52        })
53        .collect();
54    let workers = workers.as_slice();
55
56    scope(|scope| {
57        let mut handles = Vec::new();
58
59        for id in 0..threads {
60            let handle = scope.spawn(move || f(ParIter { id, items, workers }));
61            handles.push(handle);
62        }
63
64        handles.into_iter().flat_map(ScopedJoinHandle::join).collect()
65    })
66}
67
68pub struct ParIter<'a, T> {
69    id: usize,
70    items: &'a [T],
71    workers: &'a [CachePadding],
72}
73
74impl<'a, T> Iterator for ParIter<'a, T> {
75    type Item = &'a T;
76
77    fn next(&mut self) -> Option<&'a T> {
78        // First try taking from our own queue.
79        let worker = &self.workers[self.id];
80        let current = worker.increment();
81        let (start, end) = unpack(current);
82
83        // There's still items to process.
84        if start < end {
85            return Some(&self.items[start]);
86        }
87
88        // Steal from another worker, [spinlocking](https://en.wikipedia.org/wiki/Spinlock)
89        // until we acquire new items to process or there's nothing left to do.
90        loop {
91            // Find worker with the most remaining items, breaking out of the loop
92            // and returning `None` if there is no work remaining.
93            let (other, current, size) = self
94                .workers
95                .iter()
96                .filter_map(|other| {
97                    let current = other.load();
98                    let (start, end) = unpack(current);
99                    let size = end.saturating_sub(start);
100
101                    (size > 0).then_some((other, current, size))
102                })
103                .max_by_key(|&(_, _, size)| size)?;
104
105            // Split the work items into two roughly equal piles.
106            let (start, end) = unpack(current);
107            let middle = start + size.div_ceil(2);
108
109            let next = pack(middle, end);
110            let stolen = pack(start + 1, middle);
111
112            // We could be preempted by another thread stealing or by the owning worker
113            // thread finishing an item, so check indices are still unmodified.
114            if other.compare_exchange(current, next) {
115                worker.store(stolen);
116                break Some(&self.items[start]);
117            }
118        }
119    }
120}
121
122/// Intentionally force alignment to 128 bytes to make a best effort attempt to place each atomic
123/// on its own cache line. This reduces contention and improves performance for common
124/// CPU caching protocols such as [MESI](https://en.wikipedia.org/wiki/MESI_protocol).
125#[repr(align(128))]
126pub struct CachePadding {
127    atomic: AtomicUsize,
128}
129
130/// Convenience wrapper methods around atomic operations. Both start and end indices are packed
131/// into a single atomic so that we can use the fastest and easiest to reason about `Relaxed`
132/// ordering.
133impl CachePadding {
134    #[inline]
135    fn new(n: usize) -> Self {
136        CachePadding { atomic: AtomicUsize::new(n) }
137    }
138
139    #[inline]
140    fn increment(&self) -> usize {
141        self.atomic.fetch_add(1, Relaxed)
142    }
143
144    #[inline]
145    fn load(&self) -> usize {
146        self.atomic.load(Relaxed)
147    }
148
149    #[inline]
150    fn store(&self, n: usize) {
151        self.atomic.store(n, Relaxed);
152    }
153
154    #[inline]
155    fn compare_exchange(&self, current: usize, new: usize) -> bool {
156        self.atomic.compare_exchange(current, new, Relaxed, Relaxed).is_ok()
157    }
158}
159
160#[inline]
161fn pack(start: usize, end: usize) -> usize {
162    (end << 32) | start
163}
164
165#[inline]
166fn unpack(both: usize) -> (usize, usize) {
167    (both & 0xffffffff, both >> 32)
168}
169
170/// Shares monotonically increasing value between multiple threads.
171pub struct AtomicIter {
172    running: AtomicBool,
173    index: AtomicU32,
174    step: u32,
175}
176
177impl AtomicIter {
178    pub fn new(start: u32, step: u32) -> Self {
179        AtomicIter { running: AtomicBool::new(true), index: AtomicU32::from(start), step }
180    }
181
182    pub fn next(&self) -> Option<u32> {
183        self.running.load(Relaxed).then(|| self.index.fetch_add(self.step, Relaxed))
184    }
185
186    pub fn stop(&self) {
187        self.running.store(false, Relaxed);
188    }
189}