1use std::sync::atomic::{AtomicBool, AtomicU32, AtomicUsize, Ordering::Relaxed};
6use std::thread::*;
7
8pub fn threads() -> usize {
10 available_parallelism().unwrap().get()
11}
12
13pub fn spawn<F, R>(f: F) -> Vec<R>
15where
16 F: Fn() -> R + Copy + Send,
17 R: Send,
18{
19 scope(|scope| {
20 let mut handles = Vec::new();
21
22 for _ in 0..threads() {
23 let handle = scope.spawn(f);
24 handles.push(handle);
25 }
26
27 handles.into_iter().flat_map(ScopedJoinHandle::join).collect()
28 })
29}
30
31pub fn spawn_parallel_iterator<F, R, T>(items: &[T], f: F) -> Vec<R>
38where
39 F: Fn(ParIter<'_, T>) -> R + Copy + Send,
40 R: Send,
41 T: Sync,
42{
43 let threads = threads();
44 let size = items.len().div_ceil(threads);
45
46 let workers: Vec<_> = (0..threads)
48 .map(|id| {
49 let start = (id * size).min(items.len());
50 let end = (start + size).min(items.len());
51 CachePadding::new(pack(start, end))
52 })
53 .collect();
54 let workers = workers.as_slice();
55
56 scope(|scope| {
57 let mut handles = Vec::new();
58
59 for id in 0..threads {
60 let handle = scope.spawn(move || f(ParIter { id, items, workers }));
61 handles.push(handle);
62 }
63
64 handles.into_iter().flat_map(ScopedJoinHandle::join).collect()
65 })
66}
67
68pub struct ParIter<'a, T> {
69 id: usize,
70 items: &'a [T],
71 workers: &'a [CachePadding],
72}
73
74impl<'a, T> Iterator for ParIter<'a, T> {
75 type Item = &'a T;
76
77 fn next(&mut self) -> Option<&'a T> {
78 let worker = &self.workers[self.id];
80 let current = worker.increment();
81 let (start, end) = unpack(current);
82
83 if start < end {
85 return Some(&self.items[start]);
86 }
87
88 loop {
91 let available = self
93 .workers
94 .iter()
95 .filter_map(|other| {
96 let current = other.load();
97 let (start, end) = unpack(current);
98 let size = end.saturating_sub(start);
99
100 (size > 0).then_some((other, current, size))
101 })
102 .max_by_key(|&(_, _, size)| size);
103
104 if let Some((other, current, size)) = available {
105 let (start, end) = unpack(current);
107 let middle = start + size.div_ceil(2);
108
109 let next = pack(middle, end);
110 let stolen = pack(start + 1, middle);
111
112 if other.compare_exchange(current, next) {
115 worker.store(stolen);
116 break Some(&self.items[start]);
117 }
118 } else {
119 break None;
121 }
122 }
123 }
124}
125
126#[repr(align(128))]
130pub struct CachePadding {
131 atomic: AtomicUsize,
132}
133
134impl CachePadding {
138 #[inline]
139 fn new(n: usize) -> Self {
140 CachePadding { atomic: AtomicUsize::new(n) }
141 }
142
143 #[inline]
144 fn increment(&self) -> usize {
145 self.atomic.fetch_add(1, Relaxed)
146 }
147
148 #[inline]
149 fn load(&self) -> usize {
150 self.atomic.load(Relaxed)
151 }
152
153 #[inline]
154 fn store(&self, n: usize) {
155 self.atomic.store(n, Relaxed);
156 }
157
158 #[inline]
159 fn compare_exchange(&self, current: usize, new: usize) -> bool {
160 self.atomic.compare_exchange(current, new, Relaxed, Relaxed).is_ok()
161 }
162}
163
164#[inline]
165fn pack(start: usize, end: usize) -> usize {
166 (end << 32) | start
167}
168
169#[inline]
170fn unpack(both: usize) -> (usize, usize) {
171 (both & 0xffffffff, both >> 32)
172}
173
174pub struct AtomicIter {
176 running: AtomicBool,
177 index: AtomicU32,
178 step: u32,
179}
180
181impl AtomicIter {
182 pub fn new(start: u32, step: u32) -> Self {
183 AtomicIter { running: AtomicBool::new(true), index: AtomicU32::from(start), step }
184 }
185
186 pub fn next(&self) -> Option<u32> {
187 self.running.load(Relaxed).then(|| self.index.fetch_add(self.step, Relaxed))
188 }
189
190 pub fn stop(&self) {
191 self.running.store(false, Relaxed);
192 }
193}