-
-
Notifications
You must be signed in to change notification settings - Fork 4
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Direct access to message payloads #51
Comments
Just in case you didnt notice, gribberish python does have kerchunk support https://github.com/mpiannucci/gribberish/tree/main/python/gribberish/kerchunk
Its a bit nuanced because there are so many different data templates that data can come in. If we think through this a bit, each grib message is composed of sections. Each section starts with the section length and the section number. So we do indeed have access to the start byte of the data section. The tricky part comes when we want to parse the data. To generically read the data array from a given grib message, you need access to the DataRepresentationSection, the BitMap Section, and the Data Section. In any given grib message this is sections 5 through 7. There are some data packages that do not use the bitmap section, but in those cases that section is like 8 octets long so it has like no impact really. So the next question is, do we need the Data Representation Section? And my answer is i'm not totally sure. The data representation section encodes the Data Representation Template which describes how to transform the data from bytes to array. I am not totally clear if these values are shared across variables, timesteps, etc and we would have to check. So at a minimum, the answer to this question is this:
The codec would need to be updated to support these
I am not sure I understand this question totally. If you mean can we split a data stream into multiple chunks, the answer is that depends on the Data Representation template, if its JPEG (GFS Wave) its not possible, or Complex (HRRR, GFS, GEFS) I don't think its possible either, which would limit its utility because those are some of the largest message streams. Furthermore, you would still (probabaly) need the Data Representation Section (unless i'm proven otherwise through investigation) so any benefits to sub chunking would be negligible. Im interested in benefits this could bring so happy to continue this discussion |
I'm not sure I've used that :|. Is there any comparison of speed or capabilities? Or is the main difference more simply that referenced produced here will also use gribberish to decode rather than eccodes? |
Here is a pre-release blog post by @rsignell demonstrating splitting of native chunks for the sake of better random access and smaller byte ranges: https://medium.com/@rsignell_93546/9008ba6d0d67 . That case is easy, because the netCDF3 data is uncompressed, but looking at simple unpack, it seems like one could do the same here?
As far as I understand the Simple case: I don't think so, these are effectively parameters of the Codec and while they might vary from one message to the next, for each particular message type (level, etc) of a series of files, they will be the same. Whether or not there is a bitmap will also be predetermined. I would reckon, that for encodings 0, 1, 2, 4 you already have the necessary code to take only 1/N equal parts of a buffer. That covers a large fraction of use cases. Question @emfdavid : do you happen to know the size of the biggest message you encounter, and an estimate of what fraction of that is useful bytes in one of your typical workflows? |
The largest length in my BQ table for the HRRR Surface files is 3565237 bytes.
|
On the xarray front, currently it is much faster to read in the dataset, and when kerchunking it has in my opinion a better API for choosing which messages you actually care about. As for read speed, here is a simple benchmark for Complex, PNG, and JPEG compressed data, reading from python between gribberish and eccodes: gribberish is faster in many cases, and at worst more stable than eccodes.
Correct it could, the issue is that none of the most interesting data uses simple packing so its not totally useful.
I dont think this is true. Each message is compressed using Complex Packing and Spatial Differencing for HRRR, GFS, GEFS data, which are some of the only datasets that are big enough that it would make sense to split a given message into multiple chunks. GFS, GEFS Wave data are JPEG2000 packed which can also not be subchunked AFAIK.
Yes correct, each variable and possibly each timestep is compressed individually so you cannot assume packing length for non-simple packed messages |
agreed, I don't think j2k can be split. At least I don't know how, but given than zlib/deflate can be, maybe it's possible with substantial effort. |
An idea suggested by @gatesn is that, in theory, we could enable random access into any compressed file by storing the internal state of the decompressor at regular intervals (say, every 4 kB). This would require someone to read through the entire dataset to create a file storing the decompressor's internal state every 4 kB. And it would require code modifications deep inside the guts of the codec! So it probably would be a lot of work. But might be kind of fun... |
Actually, looking at the docs for openjpeg (the reference jpeg2000 implementation)... it looks like jp2k uses concepts which might be similar to "chunks". jp2k uses "tiles", "precincts", and "code-block size" (default 64x64). And openjpeg can output an index file "that summarizes the codestream. This index is useful to create a navigation process." So maybe jp2k is kind of chunked already?! (But I'm way out of my comfort zone here!) |
It might be! Dealing with the openjpeg raw bindings makes this more annoying to solve though haha I think the Complex and Spatial Differencing data representation is far more ubiquitous and more impactful to figure out. It may be possible if the differencing only happens within single rows? Ill have to relearn the compression, i implemented it over a year ago now. |
Will subchunking the spatial domain really help much for grib2 workflows? You can split the spatial domain, but you are still stuck with 1 time value in each chunk, right? And if the entire-spatial-domain chunk is only 25MB or something, splitting it further wouldn't be that much of a benefit, would it? |
Correct, my guess is that its probably not worth it because it will pretty much always be 1 time step per chunk, and even the largest grib chunks arent that big in the first place, plus the grib metadata isnt significant IMO But i dont do ML at the scale of Jack and David |
Can we do a comparison of a bunch of gribs in a series, to see if the data representation section has the same values for a particular message in each file? @emfdavid : the throughput you got with your parallel tricks in zarr, were they using gribberish as the decoder? |
@martindurant Gribberish was not complete when I started moving code into production in February. |
It might help when you only want a single pixel in the spatial domain, and many timesteps? e.g. you want a multi-year time series for a single geographical point? |
@JackKelly Maybe, but I would like to be convinced! :) We could test this idea if we already had kerchunked grib data from two different models with different spatial dimension sizes. For example, we could try reading 1000 time steps from HRRR (15MB chunks) and GFS 1/4 degree resolution (4MB chunks) with the same cluster and see how much faster GFS is. Does someone have kerchunked datasets for HRRR and GFS? |
Hehe :) Yeah, I totally agree that it'd be good to get empirical evidence. To give a tiny bit of context: I'm interested in training large machine learning models directly from huge GRIB datasets on cloud object storage. In the ideal case, I'd like to sustain several gigabytes per second to each GPU. And we'd often want to read just a few spatial locations, across many years. And I'd like to do a bunch of numerical processing on-the-fly (like normalising each value). So reducing the time to open a single GRIB message from, say, 5 ms to 2.5 ms really matters to me 🙂 |
Here is a Gist with two years for HRRR and GFS (6 and 67-72 hour horizon) with two variables dswrf and t2m. Looks like a couple forecast runs were missing for GFS? You will probably need to handle exceptions in the Codec where some of the NODD grib files are incomplete... |
Okay! Armed with @emfdavid's refs for both HRRR (14.54MB chunks) and GFS (7.92MB chunks), I tried reading 5000 chunks from each using a Coiled-generated Dask cluster of 50 workers (100 total threads). The results:
So even though the HRRR chunks are 83% bigger, they took only 20% longer to read. Here's the gist of the notebook I used. I created the coiled cluster environment with:
where channels:
- conda-forge
dependencies:
- python=3.11
- adlfs
- cf_xarray
- cfunits
- coiled
- curl
- dask>=2024.7.0
- python-eccodes
- eccodes
- fastparquet
- fsspec>=2024.2.0
- gdal
- gcsfs
- h5netcdf
- h5py
- intake>=2.0
- intake-stac
- intake-xarray
- intake-parquet
- ipywidgets
- ipykernel
- kerchunk
- metpy
- netcdf4
- numba
- numcodecs
- numpy<2
- pandas
- planetary-computer
- pyepsg
- pystac
- pystac-client
- python-snappy
- rechunker
- s3fs
- ujson
- vim
- zstandard
- xarray-datatree
- zarr |
@rsignell Mic Drop 😆 |
(let;s be sure to include those in intake catalogs, probably on public-data) |
Thanks for doing the test! That was speedy!
That's surprising. Do we have any clues as to why that is? What's dominating the runtime, if it's not IO? |
This is the in memory size, no? So the compression ratios might be different. |
Right I believe the compression ratios are different for every message, no matter the model Otherwise I don't think spatial difference works. |
As I understand complex packing:
Since the bit-widths of each group are data dependent, the data size will vary for each group even though they have the same number of values. Each instance of a message at each time-point therefore will have a different data size, but probably not by much! However, each message type can have very different bits-per-value. |
Based on logs from my latests training runs I get the following for a single container running in a Kube cluster on 16CPU pod using joblib for parallelism. HRRR
GFS
Each task is one slice of one variable from a grib file. I was feeling pretty depressed about ^ but if you run the numbers in CPU seconds, I am not doing too badly. Rich had 13.9s on 100 cpu -> 1390 cpuseconds I am very impressed with the parallel efficiency in coiled/dask! |
I would expect the compression ratio would be similar for these two models though -- they are both large scale atmospheric models and we are looking at the same variable. I guess it wouldn't be that hard to check -- we just need to look at the file sizes in object storage! |
Perhaps the grouping size or even difference order is an input parameter (I don't know). I could imagine choosing different group sizes might greatly affect the compression ratio. |
From a conversation with @emfdavid and @JackKelly
As you I think know, kerchunk currently works with grib files by finding the start/size bytes of each message, and at load time, passing this blob of bytes to eccodes to make a whole in memory grib message; the output of the process is the numerical array it represents. That means that all of the various attributes and coordinate definitions are interpreted each time, even though this information is already known from the initial scan and stored in the reference set.
Do you think there's a way, from the code here, to find the start/size of only the payload of each message, so that at read time we can grab fewer bytes and do less work making the output array? What information would need to be available at load time?
Furthermore, do you think it might be possible to chop such a message buffer into equal sections (along the slowest-varying/outer array dimension), such that each of those sections could be read independently?
The text was updated successfully, but these errors were encountered: