aoc/util/thread.rs
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160
//! Utility methods to spawn a number of
//! [scoped](https://doc.rust-lang.org/stable/std/thread/fn.scope.html)
//! threads equals to the number of cores on the machine. Unlike normal threads, scoped threads
//! can borrow data from their environment.
use std::sync::atomic::{AtomicUsize, Ordering::Relaxed};
use std::thread::*;
// Usually the number of physical cores.
fn threads() -> usize {
available_parallelism().unwrap().get()
}
/// Spawn `n` scoped threads, where `n` is the available parallelism.
pub fn spawn<F>(f: F)
where
F: Fn() + Copy + Send,
{
scope(|scope| {
for _ in 0..threads() {
scope.spawn(f);
}
});
}
/// Spawns `n` scoped threads that each receive a
/// [work stealing](https://en.wikipedia.org/wiki/Work_stealing) iterator.
/// Work stealing is an efficient strategy that keeps each CPU core busy when some items take longer
/// than other to process, used by popular libraries such as [rayon](https://github.com/rayon-rs/rayon).
/// Processing at different rates also happens on many modern CPUs with
/// [heterogeneous performance and efficiency cores](https://en.wikipedia.org/wiki/ARM_big.LITTLE).
pub fn spawn_parallel_iterator<F, T>(items: &[T], f: F)
where
F: Fn(ParIter<'_, T>) + Copy + Send,
T: Sync,
{
let threads = threads();
let size = items.len().div_ceil(threads);
// Initially divide work as evenly as possible amongst each worker thread.
let workers: Vec<_> = (0..threads)
.map(|id| {
let start = (id * size).min(items.len());
let end = (start + size).min(items.len());
CachePadding::new(pack(start, end))
})
.collect();
let workers = workers.as_slice();
scope(|scope| {
for id in 0..threads {
scope.spawn(move || f(ParIter { id, items, workers }));
}
});
}
pub struct ParIter<'a, T> {
id: usize,
items: &'a [T],
workers: &'a [CachePadding],
}
impl<'a, T> Iterator for ParIter<'a, T> {
type Item = &'a T;
fn next(&mut self) -> Option<&'a T> {
// First try taking from our own queue.
let worker = &self.workers[self.id];
let current = worker.increment();
let (start, end) = unpack(current);
// There's still items to process.
if start < end {
return Some(&self.items[start]);
}
// Steal from another worker, [spinlocking](https://en.wikipedia.org/wiki/Spinlock)
// until we acquire new items to process or there's nothing left to do.
loop {
// Find worker with the most remaining items.
let available = self
.workers
.iter()
.filter_map(|other| {
let current = other.load();
let (start, end) = unpack(current);
let size = end.saturating_sub(start);
(size > 0).then_some((other, current, size))
})
.max_by_key(|t| t.2);
if let Some((other, current, size)) = available {
// Split the work items into two roughly equal piles.
let (start, end) = unpack(current);
let middle = start + size.div_ceil(2);
let next = pack(middle, end);
let stolen = pack(start + 1, middle);
// We could be preempted by another thread stealing or by the owning worker
// thread finishing an item, so check indices are still unmodified.
if other.compare_exchange(current, next) {
worker.store(stolen);
break Some(&self.items[start]);
}
} else {
// No work remaining.
break None;
}
}
}
}
/// Intentionally force alignment to 128 bytes to make a best effort attempt to place each atomic
/// on its own cache line. This reduces contention and improves performance for common
/// CPU caching protocols such as [MESI](https://en.wikipedia.org/wiki/MESI_protocol).
#[repr(align(128))]
pub struct CachePadding {
atomic: AtomicUsize,
}
/// Convenience wrapper methods around atomic operations. Both start and end indices are packed
/// into a single atomic so that we can use the fastest and easiest to reason about `Relaxed`
/// ordering.
impl CachePadding {
#[inline]
fn new(n: usize) -> Self {
CachePadding { atomic: AtomicUsize::new(n) }
}
#[inline]
fn increment(&self) -> usize {
self.atomic.fetch_add(1, Relaxed)
}
#[inline]
fn load(&self) -> usize {
self.atomic.load(Relaxed)
}
#[inline]
fn store(&self, n: usize) {
self.atomic.store(n, Relaxed);
}
#[inline]
fn compare_exchange(&self, current: usize, new: usize) -> bool {
self.atomic.compare_exchange(current, new, Relaxed, Relaxed).is_ok()
}
}
#[inline]
fn pack(start: usize, end: usize) -> usize {
(end << 32) | start
}
#[inline]
fn unpack(both: usize) -> (usize, usize) {
(both & 0xffffffff, both >> 32)
}