aoc/year2017/
day15.rs

1//! # Dueling Generators
2//!
3//! Multithreaded approach using worker threads to generate batches of numbers for judging.
4//! Part one can be checked in parallel, but part two must be sent to a single thread as the
5//! indices must be checked in order.
6//!
7//! The sequence of numbers are [modular exponentiation](https://en.wikipedia.org/wiki/Modular_exponentiation)
8//! so we can jump to any location in the sequence, without needing to know the previous numbers.
9use crate::util::hash::*;
10use crate::util::iter::*;
11use crate::util::math::*;
12use crate::util::parse::*;
13use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
14use std::sync::mpsc::{Receiver, Sender, channel};
15use std::thread;
16
17const PART_ONE: usize = 40_000_000;
18const PART_TWO: usize = 5_000_000;
19const BLOCK: usize = 50_000;
20
21type Input = (u32, u32);
22
23/// State shared between all threads.
24pub struct Shared {
25    first: usize,
26    second: usize,
27    start: AtomicUsize,
28    done: AtomicBool,
29}
30
31/// Generated numbers from `start` to `start + BLOCK`.
32struct Block {
33    start: usize,
34    ones: u32,
35    fours: Vec<u16>,
36    eights: Vec<u16>,
37}
38
39pub fn parse(input: &str) -> Input {
40    let [first, second] = input.iter_unsigned().chunk::<2>().next().unwrap();
41    let shared = Shared { first, second, start: AtomicUsize::new(0), done: AtomicBool::new(false) };
42    let (tx, rx) = channel();
43
44    thread::scope(|scope| {
45        // Use all cores except one to generate blocks of numbers for judging.
46        for _ in 0..thread::available_parallelism().unwrap().get() - 1 {
47            scope.spawn(|| sender(&shared, &tx));
48        }
49        // Judge batches serially.
50        receiver(&shared, &rx)
51    })
52}
53
54pub fn part1(input: &Input) -> u32 {
55    input.0
56}
57
58pub fn part2(input: &Input) -> u32 {
59    input.1
60}
61
62fn sender(shared: &Shared, tx: &Sender<Block>) {
63    while !shared.done.load(Ordering::Relaxed) {
64        // Start at any point in the sequence using modular exponentiation.
65        let start = shared.start.fetch_add(BLOCK, Ordering::Relaxed);
66        let mut first = shared.first * 16807.mod_pow(start, 0x7fffffff);
67        let mut second = shared.second * 48271.mod_pow(start, 0x7fffffff);
68
69        // Estimate capacity at one quarter or one eight, plus a little extra for variance.
70        let mut ones = 0;
71        let mut fours = Vec::with_capacity((BLOCK * 30) / 100);
72        let mut eights = Vec::with_capacity((BLOCK * 15) / 100);
73
74        // Check part one pairs immediately while queueing part two pairs.
75        for _ in 0..BLOCK {
76            first = (first * 16807) % 0x7fffffff;
77            second = (second * 48271) % 0x7fffffff;
78
79            let left = first as u16;
80            let right = second as u16;
81
82            if left == right {
83                ones += 1;
84            }
85            if left % 4 == 0 {
86                fours.push(left);
87            }
88            if right % 8 == 0 {
89                eights.push(right);
90            }
91        }
92
93        let _unused = tx.send(Block { start, ones, fours, eights });
94    }
95}
96
97fn receiver(shared: &Shared, rx: &Receiver<Block>) -> (u32, u32) {
98    let mut remaining = PART_TWO;
99    let mut part_two = 0;
100
101    let mut required = 0;
102    let mut out_of_order = FastMap::new();
103    let mut blocks = Vec::new();
104
105    let mut fours_block = 0;
106    let mut fours_index = 0;
107
108    let mut eights_block = 0;
109    let mut eights_index = 0;
110
111    while remaining > 0 {
112        // Blocks could be received in any order, as there's no guarantee threads will finish
113        // processing at the same time. The `start` field of the block defines the order they
114        // must be added to the vec.
115        while fours_block >= blocks.len() || eights_block >= blocks.len() {
116            let block = rx.recv().unwrap();
117            out_of_order.insert(block.start, block);
118
119            while let Some(next) = out_of_order.remove(&required) {
120                blocks.push(next);
121                required += BLOCK;
122            }
123        }
124
125        // Iterate over the minimum block size or numbers left to check.
126        let fours = &blocks[fours_block].fours;
127        let eights = &blocks[eights_block].eights;
128        let iterations = remaining.min(fours.len() - fours_index).min(eights.len() - eights_index);
129
130        remaining -= iterations;
131
132        for _ in 0..iterations {
133            if fours[fours_index] == eights[eights_index] {
134                part_two += 1;
135            }
136            fours_index += 1;
137            eights_index += 1;
138        }
139
140        // If we've checked all the numbers in a block, advance to the next one.
141        // This may require waiting for a worker thread to create it first.
142        if fours_index == fours.len() {
143            fours_block += 1;
144            fours_index = 0;
145        }
146        if eights_index == eights.len() {
147            eights_block += 1;
148            eights_index = 0;
149        }
150    }
151
152    // Just in case, make sure we have enough blocks for part one.
153    while required < PART_ONE {
154        let block = rx.recv().unwrap();
155        out_of_order.insert(block.start, block);
156
157        while let Some(next) = out_of_order.remove(&required) {
158            blocks.push(next);
159            required += BLOCK;
160        }
161    }
162
163    // Signal worker thread to finish.
164    shared.done.store(true, Ordering::Relaxed);
165
166    // Return results.
167    let part_one = blocks.iter().take(PART_ONE / BLOCK).map(|p| p.ones).sum();
168    (part_one, part_two)
169}