Skip to content

Commit

Permalink
Compatibility with Zarr v3b2 (#9795)
Browse files Browse the repository at this point in the history
* Compatibility with Zarr v3b2

* More guards with mode="w"

* refactoring

* tweak expected requestsC

* compat

* more compat

* fix
  • Loading branch information
dcherian authored Nov 21, 2024
1 parent 6942d68 commit e510a9e
Show file tree
Hide file tree
Showing 3 changed files with 138 additions and 118 deletions.
183 changes: 102 additions & 81 deletions xarray/backends/zarr.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
from xarray.namedarray.utils import module_available

if TYPE_CHECKING:
from zarr import Array as ZarrArray
from zarr import Group as ZarrGroup

from xarray.backends.common import AbstractDataStore
Expand Down Expand Up @@ -443,7 +444,7 @@ def extract_zarr_variable_encoding(
shape = shape if shape else variable.shape
encoding = variable.encoding.copy()

safe_to_drop = {"source", "original_shape"}
safe_to_drop = {"source", "original_shape", "preferred_chunks"}
valid_encodings = {
"codecs",
"chunks",
Expand Down Expand Up @@ -871,16 +872,27 @@ def store(
else:
zarr = attempt_import("zarr")

existing_keys = tuple(self.zarr_group.array_keys())
if self._mode == "w":
# always overwrite, so we don't care about existing names,
# and consistency of encoding
new_variable_names = set(variables)
existing_keys = {}
existing_variable_names = {}
else:
existing_keys = tuple(self.zarr_group.array_keys())
existing_variable_names = {
vn for vn in variables if _encode_variable_name(vn) in existing_keys
}
new_variable_names = set(variables) - existing_variable_names

if self._mode == "r+":
new_names = [k for k in variables if k not in existing_keys]
if new_names:
raise ValueError(
f"dataset contains non-pre-existing variables {new_names}, "
"which is not allowed in ``xarray.Dataset.to_zarr()`` with "
"``mode='r+'``. To allow writing new variables, set ``mode='a'``."
)
if self._mode == "r+" and (
new_names := [k for k in variables if k not in existing_keys]
):
raise ValueError(
f"dataset contains non-pre-existing variables {new_names!r}, "
"which is not allowed in ``xarray.Dataset.to_zarr()`` with "
"``mode='r+'``. To allow writing new variables, set ``mode='a'``."
)

if self._append_dim is not None and self._append_dim not in existing_keys:
# For dimensions without coordinate values, we must parse
Expand All @@ -895,10 +907,6 @@ def store(
f"dataset dimensions {existing_dims}"
)

existing_variable_names = {
vn for vn in variables if _encode_variable_name(vn) in existing_keys
}
new_variable_names = set(variables) - existing_variable_names
variables_encoded, attributes = self.encode(
{vn: variables[vn] for vn in new_variable_names}, attributes
)
Expand All @@ -920,10 +928,9 @@ def store(
# Modified variables must use the same encoding as the store.
vars_with_encoding = {}
for vn in existing_variable_names:
if self._mode in ["a", "a-", "r+"]:
_validate_datatypes_for_zarr_append(
vn, existing_vars[vn], variables[vn]
)
_validate_datatypes_for_zarr_append(
vn, existing_vars[vn], variables[vn]
)
vars_with_encoding[vn] = variables[vn].copy(deep=False)
vars_with_encoding[vn].encoding = existing_vars[vn].encoding
vars_with_encoding, _ = self.encode(vars_with_encoding, {})
Expand Down Expand Up @@ -968,6 +975,69 @@ def store(
def sync(self):
pass

def _open_existing_array(self, *, name) -> ZarrArray:
import zarr

# TODO: if mode="a", consider overriding the existing variable
# metadata. This would need some case work properly with region
# and append_dim.
if self._write_empty is not None:
# Write to zarr_group.chunk_store instead of zarr_group.store
# See https://github.com/pydata/xarray/pull/8326#discussion_r1365311316 for a longer explanation
# The open_consolidated() enforces a mode of r or r+
# (and to_zarr with region provided enforces a read mode of r+),
# and this function makes sure the resulting Group has a store of type ConsolidatedMetadataStore
# and a 'normal Store subtype for chunk_store.
# The exact type depends on if a local path was used, or a URL of some sort,
# but the point is that it's not a read-only ConsolidatedMetadataStore.
# It is safe to write chunk data to the chunk_store because no metadata would be changed by
# to_zarr with the region parameter:
# - Because the write mode is enforced to be r+, no new variables can be added to the store
# (this is also checked and enforced in xarray.backends.api.py::to_zarr()).
# - Existing variables already have their attrs included in the consolidated metadata file.
# - The size of dimensions can not be expanded, that would require a call using `append_dim`
# which is mutually exclusive with `region`
zarr_array = zarr.open(
store=(
self.zarr_group.store if _zarr_v3() else self.zarr_group.chunk_store
),
# TODO: see if zarr should normalize these strings.
path="/".join([self.zarr_group.name.rstrip("/"), name]).lstrip("/"),
write_empty_chunks=self._write_empty,
)
else:
zarr_array = self.zarr_group[name]

return zarr_array

def _create_new_array(
self, *, name, shape, dtype, fill_value, encoding, attrs
) -> ZarrArray:
if coding.strings.check_vlen_dtype(dtype) is str:
dtype = str

if self._write_empty is not None:
if (
"write_empty_chunks" in encoding
and encoding["write_empty_chunks"] != self._write_empty
):
raise ValueError(
'Differing "write_empty_chunks" values in encoding and parameters'
f'Got {encoding["write_empty_chunks"] = } and {self._write_empty = }'
)
else:
encoding["write_empty_chunks"] = self._write_empty

zarr_array = self.zarr_group.create(
name,
shape=shape,
dtype=dtype,
fill_value=fill_value,
**encoding,
)
zarr_array = _put_attrs(zarr_array, attrs)
return zarr_array

def set_variables(self, variables, check_encoding_set, writer, unlimited_dims=None):
"""
This provides a centralized method to set the variables on the data
Expand All @@ -986,8 +1056,6 @@ def set_variables(self, variables, check_encoding_set, writer, unlimited_dims=No
dimensions.
"""

import zarr

existing_keys = tuple(self.zarr_group.array_keys())
is_zarr_v3_format = _zarr_v3() and self.zarr_group.metadata.zarr_format == 3

Expand Down Expand Up @@ -1016,47 +1084,13 @@ def set_variables(self, variables, check_encoding_set, writer, unlimited_dims=No
else:
del v.encoding["_FillValue"]

zarr_array = None
zarr_shape = None
write_region = self._write_region if self._write_region is not None else {}
write_region = {dim: write_region.get(dim, slice(None)) for dim in dims}

if name in existing_keys:
if self._mode != "w" and name in existing_keys:
# existing variable
# TODO: if mode="a", consider overriding the existing variable
# metadata. This would need some case work properly with region
# and append_dim.
if self._write_empty is not None:
# Write to zarr_group.chunk_store instead of zarr_group.store
# See https://github.com/pydata/xarray/pull/8326#discussion_r1365311316 for a longer explanation
# The open_consolidated() enforces a mode of r or r+
# (and to_zarr with region provided enforces a read mode of r+),
# and this function makes sure the resulting Group has a store of type ConsolidatedMetadataStore
# and a 'normal Store subtype for chunk_store.
# The exact type depends on if a local path was used, or a URL of some sort,
# but the point is that it's not a read-only ConsolidatedMetadataStore.
# It is safe to write chunk data to the chunk_store because no metadata would be changed by
# to_zarr with the region parameter:
# - Because the write mode is enforced to be r+, no new variables can be added to the store
# (this is also checked and enforced in xarray.backends.api.py::to_zarr()).
# - Existing variables already have their attrs included in the consolidated metadata file.
# - The size of dimensions can not be expanded, that would require a call using `append_dim`
# which is mutually exclusive with `region`
zarr_array = zarr.open(
store=(
self.zarr_group.store
if _zarr_v3()
else self.zarr_group.chunk_store
),
# TODO: see if zarr should normalize these strings.
path="/".join([self.zarr_group.name.rstrip("/"), name]).lstrip(
"/"
),
write_empty_chunks=self._write_empty,
)
else:
zarr_array = self.zarr_group[name]

zarr_array = self._open_existing_array(name=name)
if self._append_dim is not None and self._append_dim in dims:
# resize existing variable
append_axis = dims.index(self._append_dim)
Expand Down Expand Up @@ -1089,40 +1123,27 @@ def set_variables(self, variables, check_encoding_set, writer, unlimited_dims=No
shape=zarr_shape,
)

if name not in existing_keys:
if self._mode == "w" or name not in existing_keys:
# new variable
encoded_attrs = {}
encoded_attrs = {k: self.encode_attribute(v) for k, v in attrs.items()}
# the magic for storing the hidden dimension data
if is_zarr_v3_format:
encoding["dimension_names"] = dims
else:
encoded_attrs[DIMENSION_KEY] = dims
for k2, v2 in attrs.items():
encoded_attrs[k2] = self.encode_attribute(v2)

if coding.strings.check_vlen_dtype(dtype) is str:
dtype = str

if self._write_empty is not None:
if (
"write_empty_chunks" in encoding
and encoding["write_empty_chunks"] != self._write_empty
):
raise ValueError(
'Differing "write_empty_chunks" values in encoding and parameters'
f'Got {encoding["write_empty_chunks"] = } and {self._write_empty = }'
)
else:
encoding["write_empty_chunks"] = self._write_empty

zarr_array = self.zarr_group.create(
name,
shape=shape,

encoding["exists_ok" if _zarr_v3() else "overwrite"] = (
True if self._mode == "w" else False
)

zarr_array = self._create_new_array(
name=name,
dtype=dtype,
shape=shape,
fill_value=fill_value,
**encoding,
encoding=encoding,
attrs=encoded_attrs,
)
zarr_array = _put_attrs(zarr_array, encoded_attrs)

writer.add(v.data, zarr_array, region)

Expand Down
1 change: 1 addition & 0 deletions xarray/tests/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,7 @@ def _importorskip(
has_pint, requires_pint = _importorskip("pint")
has_numexpr, requires_numexpr = _importorskip("numexpr")
has_flox, requires_flox = _importorskip("flox")
has_netcdf, requires_netcdf = _importorskip("netcdf")
has_pandas_ge_2_2, requires_pandas_ge_2_2 = _importorskip("pandas", "2.2")
has_pandas_3, requires_pandas_3 = _importorskip("pandas", "3.0.0.dev0")

Expand Down
Loading

0 comments on commit e510a9e

Please sign in to comment.