"""
Contain the create_optimized_zarr_store context manager.
This context manager is used to create an "optimized" zarr store from a given input
dataset. The optimization is done by rechunking the input dataset according to a
given chunking schema.
"""
from __future__ import annotations
import contextlib
import copy
from typing import TYPE_CHECKING
import dask
import fsspec
import psutil
import xarray as xr
import zarr
from fsspec import AbstractFileSystem
from fsspec.implementations.local import LocalFileSystem
from rechunker import rechunk
from icclim._core.input_parsing import is_zarr_path, read_dataset
from icclim.exception import InvalidIcclimArgumentError
from icclim.logger import IcclimLogger
if TYPE_CHECKING:
from collections.abc import Generator
from xarray.core.dataarray import DataArray
from xarray.core.dataset import Dataset
TMP_STORE_1 = "icclim-tmp-store-1.zarr"
TMP_STORE_2 = "icclim-tmp-store-2.zarr"
DEFAULT_DASK_CONF = {
"distributed.worker.memory.target": False,
"distributed.worker.memory.spill": False,
"distributed.worker.memory.pause": "0.95",
"distributed.worker.memory.terminate": "0.98",
}
LOCAL_FILE_SYSTEM = LocalFileSystem()
logger = IcclimLogger.get_instance()
@contextlib.contextmanager
[docs]
def create_optimized_zarr_store(
in_files: str | list[str] | Dataset | DataArray,
var_names: str | list[str],
target_zarr_store_name: str = "icclim-target-store.zarr",
keep_target_store: bool = False,
chunking: dict[str, int] | None = None,
filesystem: str | AbstractFileSystem = LOCAL_FILE_SYSTEM,
) -> Generator[Dataset]:
"""
Context manager to create an zarr store given an input netcdf or xarray structure.
-- EXPERIMENTAL FEATURE --
API may changes without deprecation warning!
The execution may take a long time.
The result is rechunked according to `chunking` schema provided.
By default, when leaving `chunking` to None, the resulting zarr store is NOT chunked
on time dimension.
This kind of chunking will significantly speed up the bootstrapping of
percentiles for indices such as Tx90p, Tx10p, TN90p...
But such chunking will most likely result in suboptimal performances for other
indices.
Actually, when computing indices where no bootstrap is needed,
you should first try the computation without using `create_optimized_zarr_store`.
If there are performance issues, you may want to use `create_optimized_zarr_store`
with a dictionary of a better chunking schema than your current storage chunking.
By default, `keep_target_store` being False, the resulting zarr store is destroyed
when the context manager is exit.
To keep the zarr store for futur uses set `keep_target_store` to True.
The output is the resulting zarr store as a xarray Dataset.
Parameters
----------
in_files : str | list[str] | Dataset | DataArray
Absolute path(s) to NetCDF dataset(s), including OPeNDAP URLs,
or path to zarr store, or xarray.Dataset or xarray.DataArray.
var_names : str | list[str]
List of data variable to include in the target zarr store.
All other data variable are dropped.
The coordinate variable are untouched and are part of the target zarr store.
target_zarr_store_name : str
Name of the target zarr store.
Used to avoid overriding an existing zarr store.
chunking : dict
The target chunking schema.
keep_target_store : bool
Set to True to keep the target zarr store after the execution of the context
manager.
Set to False to remove the target zarr store once execution is finished.
Default is False.
filesystem :
A fsspec filesystem where the zarr store will be created.
Returns
-------
returns Dataset opened on the newly created target zarr store.
Examples
--------
.. code-block:: python
import icclim
with icclim.create_optimized_zarr_store(
in_files="tasmax.nc",
var_names="tasmax",
target_zarr_store_name="tasmax-store.zarr",
chunking={"time": 42, "lat": 42, "lon": 42},
) as tasmax_opti:
su_out = icclim.index(in_files=tasmax_opti, index_name="su")
"""
try:
if isinstance(filesystem, str):
filesystem = fsspec.filesystem("file")
_remove_stores(
TMP_STORE_1,
TMP_STORE_2,
target_zarr_store_name,
filesystem=filesystem,
)
yield _unsafe_create_optimized_zarr_store(
in_files,
var_names,
target_zarr_store_name,
chunking,
_get_mem_limit(),
filesystem,
)
finally:
stores_to_remove = [TMP_STORE_1, TMP_STORE_2]
if not keep_target_store:
stores_to_remove.append(target_zarr_store_name)
_remove_stores(*stores_to_remove, filesystem=filesystem)
def _remove_stores(*stores: list[str], filesystem: AbstractFileSystem) -> None:
for s in stores:
with contextlib.suppress(FileNotFoundError):
filesystem.rm(s, recursive=True, maxdepth=100)
def _unsafe_create_optimized_zarr_store(
in_files: str | list[str] | Dataset | DataArray,
var_name: str | list[str],
zarr_store_name: str,
chunking: dict[str, int] | None,
max_mem: int,
filesystem: AbstractFileSystem,
) -> Dataset:
with dask.config.set(DEFAULT_DASK_CONF):
logger.info("Rechunking in progress, this will take some time.")
is_ds_zarr = is_zarr_path(in_files)
ds = read_dataset(in_files, standard_var=None, var_name=var_name)
# drop all non essential data variables
ds = ds.drop_vars(filter(lambda v: v not in var_name, ds.data_vars.keys()))
if len(ds.data_vars.keys()) == 0:
msg = f"The variable(s) {var_name} were not found in the dataset."
raise InvalidIcclimArgumentError(msg)
if _is_rechunking_unnecessary(ds, chunking):
msg = (
f"The given input is already chunked following {chunking}."
f" It's unnecessary to rechunk data with"
f" `create_optimized_zarr_store` here."
)
raise InvalidIcclimArgumentError(msg)
chunking = _build_default_chunking(ds)
# It seems rechunker performs better when the dataset is first converted
# to a zarr store, without rechunking anything.
if not is_ds_zarr:
# needed to have unified chunk that can be written to zarr
ds = ds.chunk("auto").unify_chunks()
ds.to_zarr(TMP_STORE_1, mode="w")
ds = xr.open_zarr(TMP_STORE_1)
ds = ds.chunk(chunking)
target_chunks = {}
for data_var in ds.data_vars:
ds[data_var].encoding = {}
acc = {}
for dim in ds[data_var].dims:
acc.update({dim: ds.chunksizes[dim][0]})
target_chunks.update({data_var: acc})
for c in ds.coords:
ds[c].encoding = {}
target_chunks.update({c: None})
rechunk(
source=ds,
target_chunks=target_chunks,
max_mem=max_mem,
target_store=zarr_store_name,
temp_store=TMP_STORE_2,
).execute()
_remove_stores(TMP_STORE_1, TMP_STORE_2, filesystem=filesystem)
zarr.consolidate_metadata(zarr_store_name)
return xr.open_zarr(zarr_store_name)
def _build_default_chunking(ds: Dataset) -> dict:
# Leave dask find the best chunking schema for all dimensions but `dim`
chunking = dict.fromkeys(ds.dims, "auto")
chunking["time"] = -1
return chunking
def _is_rechunking_unnecessary(ds: Dataset, chunking: dict[str, int] | None) -> bool:
cp = copy.deepcopy(ds.chunks)
if chunking is None:
return len(ds.chunks["time"]) == 1
return ds.chunk(chunking).chunks == cp
def _get_mem_limit(factor: float = 0.9) -> int:
# According to
# https://github.com/pangeo-data/rechunker/issues/54#issuecomment-700748875
# we should limit rechunk mem usage to around 0.9 and avoid spilling to disk
if factor > 1 or factor < 0:
msg = f"factor was {factor} but, it must be between 0 and 1."
raise ValueError(msg)
try:
import distributed
max_sys_mem = (
distributed.get_client()
.submit(lambda: distributed.get_worker().memory_limit)
.result()
)
except (ValueError, ImportError):
# Assumes default scheduler is used
max_sys_mem = psutil.virtual_memory().total
return int(factor * max_sys_mem)