1use std::sync::atomic::{AtomicBool, AtomicU32, AtomicUsize, Ordering::Relaxed};
6use std::thread::*;
7
8pub fn threads() -> usize {
10 available_parallelism().unwrap().get()
11}
12
13pub fn spawn<F, R>(f: F) -> Vec<R>
15where
16 F: Fn() -> R + Copy + Send,
17 R: Send,
18{
19 scope(|scope| {
20 let mut handles = Vec::new();
21
22 for _ in 0..threads() {
23 let handle = scope.spawn(f);
24 handles.push(handle);
25 }
26
27 handles.into_iter().flat_map(ScopedJoinHandle::join).collect()
28 })
29}
30
31pub fn spawn_parallel_iterator<F, R, T>(items: &[T], f: F) -> Vec<R>
38where
39 F: Fn(ParIter<'_, T>) -> R + Copy + Send,
40 R: Send,
41 T: Sync,
42{
43 let threads = threads();
44 let size = items.len().div_ceil(threads);
45
46 let workers: Vec<_> = (0..threads)
48 .map(|id| {
49 let start = (id * size).min(items.len());
50 let end = (start + size).min(items.len());
51 CachePadding::new(pack(start, end))
52 })
53 .collect();
54 let workers = workers.as_slice();
55
56 scope(|scope| {
57 let mut handles = Vec::new();
58
59 for id in 0..threads {
60 let handle = scope.spawn(move || f(ParIter { id, items, workers }));
61 handles.push(handle);
62 }
63
64 handles.into_iter().flat_map(ScopedJoinHandle::join).collect()
65 })
66}
67
68pub struct ParIter<'a, T> {
69 id: usize,
70 items: &'a [T],
71 workers: &'a [CachePadding],
72}
73
74impl<'a, T> Iterator for ParIter<'a, T> {
75 type Item = &'a T;
76
77 fn next(&mut self) -> Option<&'a T> {
78 let worker = &self.workers[self.id];
80 let current = worker.increment();
81 let (start, end) = unpack(current);
82
83 if start < end {
85 return Some(&self.items[start]);
86 }
87
88 loop {
91 let (other, current, size) = self
94 .workers
95 .iter()
96 .filter_map(|other| {
97 let current = other.load();
98 let (start, end) = unpack(current);
99 let size = end.saturating_sub(start);
100
101 (size > 0).then_some((other, current, size))
102 })
103 .max_by_key(|&(_, _, size)| size)?;
104
105 let (start, end) = unpack(current);
107 let middle = start + size.div_ceil(2);
108
109 let next = pack(middle, end);
110 let stolen = pack(start + 1, middle);
111
112 if other.compare_exchange(current, next) {
115 worker.store(stolen);
116 break Some(&self.items[start]);
117 }
118 }
119 }
120}
121
122#[repr(align(128))]
126pub struct CachePadding {
127 atomic: AtomicUsize,
128}
129
130impl CachePadding {
134 #[inline]
135 fn new(n: usize) -> Self {
136 CachePadding { atomic: AtomicUsize::new(n) }
137 }
138
139 #[inline]
140 fn increment(&self) -> usize {
141 self.atomic.fetch_add(1, Relaxed)
142 }
143
144 #[inline]
145 fn load(&self) -> usize {
146 self.atomic.load(Relaxed)
147 }
148
149 #[inline]
150 fn store(&self, n: usize) {
151 self.atomic.store(n, Relaxed);
152 }
153
154 #[inline]
155 fn compare_exchange(&self, current: usize, new: usize) -> bool {
156 self.atomic.compare_exchange(current, new, Relaxed, Relaxed).is_ok()
157 }
158}
159
160#[inline]
161fn pack(start: usize, end: usize) -> usize {
162 (end << 32) | start
163}
164
165#[inline]
166fn unpack(both: usize) -> (usize, usize) {
167 (both & 0xffffffff, both >> 32)
168}
169
170pub struct AtomicIter {
172 running: AtomicBool,
173 index: AtomicU32,
174 step: u32,
175}
176
177impl AtomicIter {
178 pub fn new(start: u32, step: u32) -> Self {
179 AtomicIter { running: AtomicBool::new(true), index: AtomicU32::from(start), step }
180 }
181
182 pub fn next(&self) -> Option<u32> {
183 self.running.load(Relaxed).then(|| self.index.fetch_add(self.step, Relaxed))
184 }
185
186 pub fn stop(&self) {
187 self.running.store(false, Relaxed);
188 }
189}