1use std::sync::atomic::{AtomicUsize, Ordering::Relaxed};
6use std::thread::*;
7
8fn threads() -> usize {
10 available_parallelism().unwrap().get()
11}
12
13pub fn spawn<F>(f: F)
15where
16 F: Fn() + Copy + Send,
17{
18 scope(|scope| {
19 for _ in 0..threads() {
20 scope.spawn(f);
21 }
22 });
23}
24
25pub fn spawn_parallel_iterator<F, T>(items: &[T], f: F)
32where
33 F: Fn(ParIter<'_, T>) + Copy + Send,
34 T: Sync,
35{
36 let threads = threads();
37 let size = items.len().div_ceil(threads);
38
39 let workers: Vec<_> = (0..threads)
41 .map(|id| {
42 let start = (id * size).min(items.len());
43 let end = (start + size).min(items.len());
44 CachePadding::new(pack(start, end))
45 })
46 .collect();
47 let workers = workers.as_slice();
48
49 scope(|scope| {
50 for id in 0..threads {
51 scope.spawn(move || f(ParIter { id, items, workers }));
52 }
53 });
54}
55
56pub struct ParIter<'a, T> {
57 id: usize,
58 items: &'a [T],
59 workers: &'a [CachePadding],
60}
61
62impl<'a, T> Iterator for ParIter<'a, T> {
63 type Item = &'a T;
64
65 fn next(&mut self) -> Option<&'a T> {
66 let worker = &self.workers[self.id];
68 let current = worker.increment();
69 let (start, end) = unpack(current);
70
71 if start < end {
73 return Some(&self.items[start]);
74 }
75
76 loop {
79 let available = self
81 .workers
82 .iter()
83 .filter_map(|other| {
84 let current = other.load();
85 let (start, end) = unpack(current);
86 let size = end.saturating_sub(start);
87
88 (size > 0).then_some((other, current, size))
89 })
90 .max_by_key(|t| t.2);
91
92 if let Some((other, current, size)) = available {
93 let (start, end) = unpack(current);
95 let middle = start + size.div_ceil(2);
96
97 let next = pack(middle, end);
98 let stolen = pack(start + 1, middle);
99
100 if other.compare_exchange(current, next) {
103 worker.store(stolen);
104 break Some(&self.items[start]);
105 }
106 } else {
107 break None;
109 }
110 }
111 }
112}
113
114#[repr(align(128))]
118pub struct CachePadding {
119 atomic: AtomicUsize,
120}
121
122impl CachePadding {
126 #[inline]
127 fn new(n: usize) -> Self {
128 CachePadding { atomic: AtomicUsize::new(n) }
129 }
130
131 #[inline]
132 fn increment(&self) -> usize {
133 self.atomic.fetch_add(1, Relaxed)
134 }
135
136 #[inline]
137 fn load(&self) -> usize {
138 self.atomic.load(Relaxed)
139 }
140
141 #[inline]
142 fn store(&self, n: usize) {
143 self.atomic.store(n, Relaxed);
144 }
145
146 #[inline]
147 fn compare_exchange(&self, current: usize, new: usize) -> bool {
148 self.atomic.compare_exchange(current, new, Relaxed, Relaxed).is_ok()
149 }
150}
151
152#[inline]
153fn pack(start: usize, end: usize) -> usize {
154 (end << 32) | start
155}
156
157#[inline]
158fn unpack(both: usize) -> (usize, usize) {
159 (both & 0xffffffff, both >> 32)
160}