aoc/util/
thread.rs

1//! Utility methods to spawn a number of
2//! [scoped](https://doc.rust-lang.org/stable/std/thread/fn.scope.html)
3//! threads equals to the number of cores on the machine. Unlike normal threads, scoped threads
4//! can borrow data from their environment.
5use std::sync::atomic::{AtomicBool, AtomicU32, AtomicUsize, Ordering::Relaxed};
6use std::thread::*;
7
8/// Usually the number of physical cores.
9pub fn threads() -> usize {
10    available_parallelism().unwrap().get()
11}
12
13/// Spawn `n` scoped threads, where `n` is the available parallelism.
14pub fn spawn<F, R>(f: F) -> Vec<R>
15where
16    F: Fn() -> R + Copy + Send,
17    R: Send,
18{
19    scope(|scope| {
20        let mut handles = Vec::new();
21
22        for _ in 0..threads() {
23            let handle = scope.spawn(f);
24            handles.push(handle);
25        }
26
27        handles.into_iter().flat_map(ScopedJoinHandle::join).collect()
28    })
29}
30
31/// Spawns `n` scoped threads that each receive a
32/// [work stealing](https://en.wikipedia.org/wiki/Work_stealing) iterator.
33/// Work stealing is an efficient strategy that keeps each CPU core busy when some items take longer
34/// than other to process, used by popular libraries such as [rayon](https://github.com/rayon-rs/rayon).
35/// Processing at different rates also happens on many modern CPUs with
36/// [heterogeneous performance and efficiency cores](https://en.wikipedia.org/wiki/ARM_big.LITTLE).
37pub fn spawn_parallel_iterator<F, R, T>(items: &[T], f: F) -> Vec<R>
38where
39    F: Fn(ParIter<'_, T>) -> R + Copy + Send,
40    R: Send,
41    T: Sync,
42{
43    let threads = threads();
44    let size = items.len().div_ceil(threads);
45
46    // Initially divide work as evenly as possible amongst each worker thread.
47    let workers: Vec<_> = (0..threads)
48        .map(|id| {
49            let start = (id * size).min(items.len());
50            let end = (start + size).min(items.len());
51            CachePadding::new(pack(start, end))
52        })
53        .collect();
54    let workers = workers.as_slice();
55
56    scope(|scope| {
57        let mut handles = Vec::new();
58
59        for id in 0..threads {
60            let handle = scope.spawn(move || f(ParIter { id, items, workers }));
61            handles.push(handle);
62        }
63
64        handles.into_iter().flat_map(ScopedJoinHandle::join).collect()
65    })
66}
67
68pub struct ParIter<'a, T> {
69    id: usize,
70    items: &'a [T],
71    workers: &'a [CachePadding],
72}
73
74impl<'a, T> Iterator for ParIter<'a, T> {
75    type Item = &'a T;
76
77    fn next(&mut self) -> Option<&'a T> {
78        // First try taking from our own queue.
79        let worker = &self.workers[self.id];
80        let current = worker.increment();
81        let (start, end) = unpack(current);
82
83        // There's still items to process.
84        if start < end {
85            return Some(&self.items[start]);
86        }
87
88        // Steal from another worker, [spinlocking](https://en.wikipedia.org/wiki/Spinlock)
89        // until we acquire new items to process or there's nothing left to do.
90        loop {
91            // Find worker with the most remaining items.
92            let available = self
93                .workers
94                .iter()
95                .filter_map(|other| {
96                    let current = other.load();
97                    let (start, end) = unpack(current);
98                    let size = end.saturating_sub(start);
99
100                    (size > 0).then_some((other, current, size))
101                })
102                .max_by_key(|&(_, _, size)| size);
103
104            if let Some((other, current, size)) = available {
105                // Split the work items into two roughly equal piles.
106                let (start, end) = unpack(current);
107                let middle = start + size.div_ceil(2);
108
109                let next = pack(middle, end);
110                let stolen = pack(start + 1, middle);
111
112                // We could be preempted by another thread stealing or by the owning worker
113                // thread finishing an item, so check indices are still unmodified.
114                if other.compare_exchange(current, next) {
115                    worker.store(stolen);
116                    break Some(&self.items[start]);
117                }
118            } else {
119                // No work remaining.
120                break None;
121            }
122        }
123    }
124}
125
126/// Intentionally force alignment to 128 bytes to make a best effort attempt to place each atomic
127/// on its own cache line. This reduces contention and improves performance for common
128/// CPU caching protocols such as [MESI](https://en.wikipedia.org/wiki/MESI_protocol).
129#[repr(align(128))]
130pub struct CachePadding {
131    atomic: AtomicUsize,
132}
133
134/// Convenience wrapper methods around atomic operations. Both start and end indices are packed
135/// into a single atomic so that we can use the fastest and easiest to reason about `Relaxed`
136/// ordering.
137impl CachePadding {
138    #[inline]
139    fn new(n: usize) -> Self {
140        CachePadding { atomic: AtomicUsize::new(n) }
141    }
142
143    #[inline]
144    fn increment(&self) -> usize {
145        self.atomic.fetch_add(1, Relaxed)
146    }
147
148    #[inline]
149    fn load(&self) -> usize {
150        self.atomic.load(Relaxed)
151    }
152
153    #[inline]
154    fn store(&self, n: usize) {
155        self.atomic.store(n, Relaxed);
156    }
157
158    #[inline]
159    fn compare_exchange(&self, current: usize, new: usize) -> bool {
160        self.atomic.compare_exchange(current, new, Relaxed, Relaxed).is_ok()
161    }
162}
163
164#[inline]
165fn pack(start: usize, end: usize) -> usize {
166    (end << 32) | start
167}
168
169#[inline]
170fn unpack(both: usize) -> (usize, usize) {
171    (both & 0xffffffff, both >> 32)
172}
173
174/// Shares monotonically increasing value between multiple threads.
175pub struct AtomicIter {
176    running: AtomicBool,
177    index: AtomicU32,
178    step: u32,
179}
180
181impl AtomicIter {
182    pub fn new(start: u32, step: u32) -> Self {
183        AtomicIter { running: AtomicBool::new(true), index: AtomicU32::from(start), step }
184    }
185
186    pub fn next(&self) -> Option<u32> {
187        self.running.load(Relaxed).then(|| self.index.fetch_add(self.step, Relaxed))
188    }
189
190    pub fn stop(&self) {
191        self.running.store(false, Relaxed);
192    }
193}