Fastest known solution: 0.577s (8 core Zen2); C with heavy AVX2 #710
austindonisan
started this conversation in
Show and tell
Replies: 2 comments 3 replies
-
Very impressive! Would you happen to know by any chance which packages I need to install on CentOS Stream for the compiler tool chain required to build your solution? |
Beta Was this translation helpful? Give feedback.
1 reply
-
Beta Was this translation helpful? Give feedback.
2 replies
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
-
https://github.com/austindonisan/1brc/tree/master
This implementation maximizes AVX2 effectiveness by processing rows in parallel as much as possible. Additionally fork() is used over threads to allow for parallel unmapping, and optionally skipping unmapping entirely by leaving orphan processes.
Performance works out to about 14.5 cycles/row, with 2.8 instructions/cycle. Using a Zen2 GCP instance with 8 cores and leaving orphan processes hyperfine runs are consistently below 0.580s:
Individual runs complete in 0.520 - 0.525s;
Reaping all children only slows it down by about 20ms on repeated runs:
Streams
Each worker process is given a contiguous chunk of the input file to process. Each worker then splits the chunks into 8 "streams." In the main loop the worker processes these streams in parallel, one row at a time from each stream.
Each stream is advanced by reading 32 bytes (the size of an AVX register) and processing it. These 32 byte chunks each contain 1 city name, and bytes starting at the first semicolon are masked to null. Each stream is advanced exactly 1 row in each loop iteration, regardless of the city name length.
From these 8 registers it computes 1 AVX register containing 8 32 bit hashes and 1 AVX register containing 8 32 temperature values. It loads 8 sets of measurements from a hash table and updates the 8 sets of sum/count/min/max in parallel.
Each stream independently parses all of its allocated work, causing them to end at different times. This is handled by pointing finished streams at a block of dummy data. This dummy data is then filtered out at the printing step.
There was some tension about using 4 vs 8 streams since fitting 8 data in a 32 byte register is awkward for some of the steps and keeping track of 8 data for each step causes a lot of register pressure. However the extra shuffling and register spills/reloads are more than made up for by processing 2x the rows per iteration, and the much fuller pipelines it gives.
One challenge was keeping 8 stream pointers, since 8 64-bit pointers don't fit in an AVX registers. This is handled by using 32-bit offsets with a shared-64 bit base pointer. If the data being processed is more than 4GB/worker it's chunked into 4GB sections.
City length
Cities 31 bytes or shorter are classified as "short" and take the happy path of fully parallelized processing. This applies to 100% of the cities in challenge dataset.
If a semicolon isn't seen in the first 32 bytes the city is classified as "long" and takes a detour. These city names are processed in series and are stored in a parallel hash map before rejoining the main loop for number parsing, hashing, and updating. The short city name is changed to be a pointer to the long city name so that when printing results later on it can look up the full 127 byte name. This code path is not optimized.
Hashing
City names and store measures are stored in 2 separate fixed size hash tables. The city name hash table uses the 32 byte city name as the key and the value, and linear probing is used to handle collisions. The measurements table doesn't have a key, and instead whatever index the city name is stored at is also used for the measurements.
Storing them separately seems like a poor choice from a cache perspective. However doing so does give a 50% chance that city name collisions wind up on the same row, and the measurements data doesn't fit nicely into the remaining cache line anyway (for reasons discussed in the updating section).
For calculating the hash initially I just used Rust's fxhash on the first 8-16 bytes of the city and it was actually a bit faster than the current hashing. The CPU's superscalar nature easily hashed multiple cities in parallel despite the scalar code.
However this requires leaving each result in a general purpose register, and when adding in support for cities >= 32 characters the register pressure became a problem (I spent a lot of time with godbolt.org open while working on this). I wrote a simple hash function which xors the first 16 bytes of each city down to 4 bytes finishing with a madd_epi16(). These frees up 8 general purpose registers, storing all 8 hash values in a single YMM register
I know nothing of hash function theory, but it worked well enough for the dataset. Looking at only the first 16 bytes means that Alexandra/Alexandria collide, and there is 1 additional colliding pair for a 0.5% collision rate. Each collision causes ~25 cycles of delay, which averages out to 1 cycle/loop. Avoiding these collisions would take more time than that and isn't worth it.
Parsing
I'm pretty proud of the parsing code as it uses a ton of bit trickery to keep the instruction count down. It works out to about only 21 instructions to parse 8 numbers and calculate the line lengths. This was one of the spots where parsing 8 at a time is awkward since "-99.9" is 5 characters. This is handled by splitting the initial data across 2 registers before separating out the negative sign and re-aligning the digits.
This is one of the spots where targeting a Zen2 CPU hurts. Loading the two 4 x 8 byte chunks of data would be ideal for using 2 gather() instructions. However Zen2 implements gather with microcode and it's slow enough that the ~35 traditional mov/load instructions wind up faster.
Updating each row
Now that 8 rows are parsed, it's time to update the totals. Each city's row is fetched from memory and then transposed to column major order. This seems wasteful with about 20 instructions worth of shuffling in and out, but is completely paid for by only doing 4 "math" instructions instead of the 32 that scalar code would require.
Updating 8 values at once causes a problem though. If the same city appears in multiple streams then naively loading/updating a measurement would cause the earlier ones to be overwritten and the row's measurement lost. This is handled by storing 8 separate measurement entries, one for each stream. After all of the streams have completed they are reduced into a single entry.
Having 8 copies of the measurement data is unfortunate as it pushes the measurement data out of the L1 cache. However there is enough L2 bandwidth and processing 8 values at once makes the stall not too painful.
To make each row fit in 32 bytes the obvious solution is a 64 bit sum, 32 count, and 16 bit min/max. However 16 bit min/max values requires packing the computed 32 bit values to doubled 16 bit values, and also negating half of them. It turned out to be faster to make the sum and count share 64 bits, leaving min/max each to get 32 bits.
At first glance 64 bits isn't enough to store the sum and the count. The maximum possible sum, 999 billion, requires 40 bits, and 1 billion for the count requires 30 bits. However it takes advantage of the fact that there are 8 streams and 8 threads means that at this step so only 34 and 24bits are actually needed.
Combining these values is made slightly tricky by negative sums, but that's handled by setting the initial sum to 2^35, leaving the higher bits always unused and free for the count
Parallelization
To parallelize at the processor level this solution uses fork() instead of threads. This has the benefit of separating the address space meaning the kernel is free to alter the processes's page table fully in parallel.
The forking is done recursively with a branch width of 8. So in the 8 core case it's just a parent process forking 8 worker children. However if run with 64 cores it will first fork 8 children, which will then fork 8 workers each.
For very high thread counts, the kernel sometimes schedules these processes very poorly. Also when running on NUMA systems the page cache for the input file could be on a different node than where the worker has been scheduled. To improve this each worker is pinned to the CPU matching its chunk number.
The input file is chunked in roughly equal byte sizes and allocated to each worker. Each worker completes its entire chunk with no re-leveling. Each worker uses mmap() to map the chunk of file into its address space. Additionally it maps a page of dummy data immediately before this chunk, which is used by streams that finish early while other streams are still processing.
Each worker is given a block of shared memory to store its results in. It isn't used directly for hash tables though, since shared memory has huge pages disabled by default in Linux, and keeping the entire hash tables in the TLB is a significant performance win. So instead each worker allocates memory for its hash tables followed by an madvise() call on it to trigger transparent huge pages. After the worker completes it copies its data to the shared memory block.
Once finished, each worker signals its parent through a pipe. The parent process listens to all its children using poll() and merges the results of child processes immediately together as they come in. Once all the child results have finished the parent then signals its parent, which is either the main process or another worker higher in the fork tree.
The benefit of using a pipe, instead of just waiting for the child to exit, is that the parent process now doesn't have to wait for the child to unload its memory before merging results together.
Now that the parent process has the data, it doesn't actually need to wait for its children to fully exit. This leaves the main process free to actually print the result and exit before cleanup has completed. Leaving orphan children like this is arguably against the spirit of the competition, and there is a flag to fully wait() for all child processes before exiting.
Prefetching
CPUs have gotten really good at prefteching, but there were still small gains to be made. It's a total black black box, but issuing a prefetch command for a line of memory slightly in advanced lowered times by about 6% (and 8% on the 10k dataset). For comparison on an older Haswell CPU prefetching reduced the runtime by 35%!
Using perf is a hugely useful tool for looking into performance, with CPU counters like cache and branch misses and also instruction-level profiling. Sadly it can't be used on cloud instances which was meant I couldn't dig as deep into optimizing this for Zen2 as I would have liked.
Beta Was this translation helpful? Give feedback.
All reactions