aoc/util/
thread.rs

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
//! Utility methods to spawn a number of
//! [scoped](https://doc.rust-lang.org/stable/std/thread/fn.scope.html)
//! threads equals to the number of cores on the machine. Unlike normal threads, scoped threads
//! can borrow data from their environment.
use std::sync::atomic::{AtomicUsize, Ordering::Relaxed};
use std::thread::*;

// Usually the number of physical cores.
fn threads() -> usize {
    available_parallelism().unwrap().get()
}

/// Spawn `n` scoped threads, where `n` is the available parallelism.
pub fn spawn<F>(f: F)
where
    F: Fn() + Copy + Send,
{
    scope(|scope| {
        for _ in 0..threads() {
            scope.spawn(f);
        }
    });
}

/// Spawns `n` scoped threads that each receive a
/// [work stealing](https://en.wikipedia.org/wiki/Work_stealing) iterator.
/// Work stealing is an efficient strategy that keeps each CPU core busy when some items take longer
/// than other to process, used by popular libraries such as [rayon](https://github.com/rayon-rs/rayon).
/// Processing at different rates also happens on many modern CPUs with
/// [heterogeneous performance and efficiency cores](https://en.wikipedia.org/wiki/ARM_big.LITTLE).
pub fn spawn_parallel_iterator<F, T>(items: &[T], f: F)
where
    F: Fn(ParIter<'_, T>) + Copy + Send,
    T: Sync,
{
    let threads = threads();
    let size = items.len().div_ceil(threads);

    // Initially divide work as evenly as possible amongst each worker thread.
    let workers: Vec<_> = (0..threads)
        .map(|id| {
            let start = (id * size).min(items.len());
            let end = (start + size).min(items.len());
            CachePadding::new(pack(start, end))
        })
        .collect();
    let workers = workers.as_slice();

    scope(|scope| {
        for id in 0..threads {
            scope.spawn(move || f(ParIter { id, items, workers }));
        }
    });
}

pub struct ParIter<'a, T> {
    id: usize,
    items: &'a [T],
    workers: &'a [CachePadding],
}

impl<'a, T> Iterator for ParIter<'a, T> {
    type Item = &'a T;

    fn next(&mut self) -> Option<&'a T> {
        // First try taking from our own queue.
        let worker = &self.workers[self.id];
        let current = worker.increment();
        let (start, end) = unpack(current);

        // There's still items to process.
        if start < end {
            return Some(&self.items[start]);
        }

        // Steal from another worker, [spinlocking](https://en.wikipedia.org/wiki/Spinlock)
        // until we acquire new items to process or there's nothing left to do.
        loop {
            // Find worker with the most remaining items.
            let available = self
                .workers
                .iter()
                .filter_map(|other| {
                    let current = other.load();
                    let (start, end) = unpack(current);
                    let size = end.saturating_sub(start);

                    (size > 0).then_some((other, current, size))
                })
                .max_by_key(|t| t.2);

            if let Some((other, current, size)) = available {
                // Split the work items into two roughly equal piles.
                let (start, end) = unpack(current);
                let middle = start + size.div_ceil(2);

                let next = pack(middle, end);
                let stolen = pack(start + 1, middle);

                // We could be preempted by another thread stealing or by the owning worker
                // thread finishing an item, so check indices are still unmodified.
                if other.compare_exchange(current, next) {
                    worker.store(stolen);
                    break Some(&self.items[start]);
                }
            } else {
                // No work remaining.
                break None;
            }
        }
    }
}

/// Intentionally force alignment to 128 bytes to make a best effort attempt to place each atomic
/// on its own cache line. This reduces contention and improves performance for common
/// CPU caching protocols such as [MESI](https://en.wikipedia.org/wiki/MESI_protocol).
#[repr(align(128))]
pub struct CachePadding {
    atomic: AtomicUsize,
}

/// Convenience wrapper methods around atomic operations. Both start and end indices are packed
/// into a single atomic so that we can use the fastest and easiest to reason about `Relaxed`
/// ordering.
impl CachePadding {
    #[inline]
    fn new(n: usize) -> Self {
        CachePadding { atomic: AtomicUsize::new(n) }
    }

    #[inline]
    fn increment(&self) -> usize {
        self.atomic.fetch_add(1, Relaxed)
    }

    #[inline]
    fn load(&self) -> usize {
        self.atomic.load(Relaxed)
    }

    #[inline]
    fn store(&self, n: usize) {
        self.atomic.store(n, Relaxed);
    }

    #[inline]
    fn compare_exchange(&self, current: usize, new: usize) -> bool {
        self.atomic.compare_exchange(current, new, Relaxed, Relaxed).is_ok()
    }
}

#[inline]
fn pack(start: usize, end: usize) -> usize {
    (end << 32) | start
}

#[inline]
fn unpack(both: usize) -> (usize, usize) {
    (both & 0xffffffff, both >> 32)
}