# -*- Coding: latin-1 -*-
# Copyright CERFACS (http://cerfacs.fr/)
# Apache License, Version 2.0 (http://www.apache.org/licenses/LICENSE-2.0)
"""
Main entry point of icclim.
This module expose icclim principal function, notably `index` which is use by the
generated API.
A convenience function `indices` is also exposed to compute multiple indices at once.
"""
from __future__ import annotations
import datetime as dt
import operator
import time
from collections.abc import Callable, Sequence
from functools import reduce
from typing import TYPE_CHECKING, Any, Literal, Union
from warnings import warn
import numpy as np
import xarray as xr
from icclim._core.climate_variable import (
ClimateVariable,
build_climate_vars,
)
from icclim._core.constants import (
ICCLIM_REFERENCE,
RESAMPLE_METHOD,
UNITS_KEY,
)
from icclim._core.generic.indicator import GenericIndicator
from icclim._core.input_parsing import build_input_dict
from icclim._core.legacy.user_index import parse
from icclim._core.model.index_config import IndexConfig
from icclim._core.model.index_group import IndexGroup, IndexGroupRegistry
from icclim._core.model.logical_link import LogicalLinkRegistry
from icclim._core.model.netcdf_version import NetcdfVersion, NetcdfVersionRegistry
from icclim._core.model.quantile_interpolation import (
QuantileInterpolation,
QuantileInterpolationRegistry,
)
from icclim._core.model.standard_index import StandardIndex
from icclim._core.model.threshold import Threshold
from icclim._core.utils import read_date
from icclim.dcsc.registry import DcscIndexRegistry
from icclim.ecad.binding import (
StandardizedPrecipitationIndex3,
StandardizedPrecipitationIndex6,
)
from icclim.ecad.registry import EcadIndexRegistry
from icclim.exception import InvalidIcclimArgumentError
from icclim.frequency import Frequency, FrequencyRegistry
from icclim.generic.registry import GenericIndicatorRegistry
from icclim.logger import IcclimLogger, Verbosity, VerbosityRegistry
from icclim.threshold.factory import build_threshold
if TYPE_CHECKING:
from xarray.core.dataarray import DataArray
from xarray.core.dataset import Dataset
from icclim._core.legacy.user_index.model import UserIndexDict
from icclim._core.model.icclim_types import (
FrequencyLike,
InFileLike,
SamplingMethodLike,
)
from icclim._core.model.in_file_dictionary import InFileDictionary
from icclim._core.model.indicator import Indicator
SliceMode = Union[str, Sequence[str], "Frequency", "FrequencyLike"]
log: IcclimLogger = IcclimLogger.get_instance(VerbosityRegistry.LOW)
HISTORY_CF_KEY = "history"
SOURCE_CF_KEY = "source"
NUMBER_OF_NOTNULL_VAR = "number_of_notnull"
[docs]
def indices(
index_group: Sequence[str] | str | IndexGroup | StandardIndex,
*,
ignore_error: bool = False,
**kwargs,
) -> Dataset:
"""
Compute multiple indices at the same time.
The input dataset(s) must include all the necessary variables.
It can only be used with keyword arguments (kwargs).
Parameters
----------
index_group : "all" | str | IndexGroup | list[str]
Either the name of an IndexGroup or an instance of IndexGroup or a list
of index short names or the name(s) of standard variable(s) (such as 'tasmax').
The value "all" can also be used to compute every indices.
Note that the input given by ``in_files`` must include all the necessary
variables to compute the indices of this group.
ignore_error: bool
When True, ignore indices that fails to compute. This is option is particularly
useful when used with `index_group='all'` to compute everything that can be
computed given the input.
kwargs : Dict
``icclim.index`` keyword arguments.
Returns
-------
xr.Dataset
A Dataset with one data variable per index.
Notes
-----
If ``output_file`` is part of kwargs, the result is written in a single netCDF
file, which will contain all the index results of this group.
"""
indices = _get_ecad_indices_of_group(index_group)
out = None
if "out_file" in kwargs:
out = kwargs["out_file"]
del kwargs["out_file"]
acc = []
for i in indices:
log.info("Computing index %s", i.short_name)
kwargs["index_name"] = i.short_name
try:
res = index(**kwargs)
res = _rename_coords(res, i.short_name)
res = _drop_group_auxiliary_vars(res)
acc.append(res)
except Exception:
if ignore_error:
warn(f"Could not compute {i.short_name}.", stacklevel=2)
else:
raise
ds: Dataset = xr.merge(acc, compat="no_conflicts", join="outer")
if out is not None:
_write_output_file(
result_ds=ds,
input_time_encoding=ds.time.encoding,
netcdf_version=kwargs.get("netcdf_version", NetcdfVersionRegistry.NETCDF4),
file_path=out,
)
return ds
def _get_ecad_indices_of_group(
query: Sequence[str] | str | IndexGroup | StandardIndex,
) -> list[StandardIndex]:
if query == IndexGroupRegistry.WILD_CARD_GROUP or (
isinstance(query, str)
and query.lower() == IndexGroupRegistry.WILD_CARD_GROUP.name
):
return EcadIndexRegistry.values()
query_list = query if isinstance(query, (list, tuple)) else [query]
indices = _look_for_standard_indices(query_list)
if indices:
return indices
indices = _look_for_variable_indices(query_list)
if indices:
return indices
indices = _look_for_index_groups(query_list)
if indices:
return indices
msg = f"The index group {query} was not recognized."
raise InvalidIcclimArgumentError(msg)
def _look_for_standard_indices(query: Sequence[Any]) -> list[StandardIndex] | None:
indices = [EcadIndexRegistry.lookup_no_error(i) for i in query]
res = [i for i in indices if i is not None]
if len(res) == len(query):
return res
return None
def _look_for_variable_indices(query: Sequence[Any]) -> list[StandardIndex] | None:
indices = []
for ecad_index in EcadIndexRegistry.values():
has_var = True
if ecad_index.input_variables is not None:
for var in ecad_index.input_variables:
is_query_in_aliases = (
standard_var in var.aliases for standard_var in query
)
has_var &= any(is_query_in_aliases)
else:
has_var = False
if has_var:
indices.append(ecad_index)
if len(indices) >= len(query):
return indices
return None
def _look_for_index_groups(query: Sequence[Any]) -> list[StandardIndex] | None:
groups = [IndexGroupRegistry.lookup_no_error(i) for i in query]
res = [i for i in groups if i is not None]
if res:
indices = [x.get_indices() for x in res]
return reduce(operator.add, indices, []) # flatten list[list]
return None
[docs]
def indice(*args, **kwargs) -> Dataset:
"""
Proxy for `icclim.index` function.
Deprecated: to be deleted in a future release.
"""
log.deprecation_warning(old="icclim.indice", new="icclim.index")
return index(*args, **kwargs)
[docs]
def index(
in_files: InFileLike,
index_name: str | GenericIndicator | StandardIndex | None = None,
var_name: str | Sequence[str] | None = None,
slice_mode: FrequencyLike | Frequency = "year",
time_range: Sequence[dt.datetime | str] | None = None,
out_file: str | None = None,
threshold: str | Threshold | Sequence[str | Threshold] | None = None,
callback: Callable[[int], None] = log.callback,
callback_percentage_start_value: int = 0,
callback_percentage_total: int = 100,
base_period_time_range: Sequence[dt.datetime] | Sequence[str] | None = None,
doy_window_width: int = 5,
only_leap_years: bool = False,
ignore_Feb29th: bool = False,
interpolation: str | QuantileInterpolation = "median_unbiased",
out_unit: str | None = None,
netcdf_version: str | NetcdfVersion = "NETCDF4",
user_index: UserIndexDict | None = None,
save_thresholds: bool = False,
logs_verbosity: Verbosity | str = "LOW",
date_event: bool = False,
min_spell_length: int | None = 6,
rolling_window_width: int | None = 5,
sampling_method: SamplingMethodLike = RESAMPLE_METHOD,
run_index: str | None = "first",
allow_partial_seasons: bool | Literal["start", "end"] = False,
*,
# deprecated params are kwargs only
window_width: int | None = None,
save_percentile: bool | None = None,
indice_name: str | None = None,
user_indice: UserIndexDict | None = None,
transfer_limit_Mbytes: float | None = None,
) -> Dataset:
"""
Compute climate index.
This is the main entry point for icclim.
.. warning::
The ``user_index`` parameter is deprecated.
Please use the generic indices API instead.
Parameters
----------
in_files: str | list[str] | Dataset | DataArray | InputDictionary
Absolute path(s) to NetCDF dataset(s), including OPeNDAP URLs,
or path to zarr store, or xarray.Dataset or xarray.DataArray.
index_name: str | StandardIndex
Climate index name.
For ECA&D index, case insensitive name used to lookup the index.
For user index, it's the name of the output variable.
var_name: str | list[str] | None
``optional`` Target variable name to process corresponding to ``in_files``.
If None (default) on ECA&D index, the variable is guessed based on the
climate index wanted.
Mandatory for a user index.
slice_mode: FrequencyLike | Frequency
Type of temporal aggregation:
The possibles values are ``{"year", "month", "DJF", "MAM", "JJA", "SON",
"ONDJFM" or "AMJJAS", ("season", [1,2,3]), ("month", [1,2,3,])}``
(where season and month lists can be customized) or any valid pandas
frequency.
A season can also be defined between two exact dates:
``("season", ("19 july", "14 august"))``.
Spatially varying seasons can be defined by providing a tuple of
two ``xarray.DataArray`` objects (start day-of-year and end day-of-year):
``(start_da, end_da)``.
Default is "year".
See :ref:`slice_mode` for details.
time_range: list[datetime.datetime ] | list[str] | tuple[str, str] | None
``optional`` Temporal range: upper and lower bounds for temporal subsetting.
If ``None``, whole period of input files will be processed.
The dates can either be given as instance of datetime.datetime or as string
values. For strings, many format are accepted.
Default is ``None``.
out_file: str | None
Output NetCDF file name (default: "icclim_out.nc" in the current directory).
Default is "icclim_out.nc".
If the input ``in_files`` is a ``Dataset``, ``out_file`` field is ignored.
Use the function returned value instead to retrieve the computed value.
If ``out_file`` already exists, icclim will overwrite it!
threshold: float | list[float] | None
``optional`` User defined threshold for certain indices.
Default depend on the index, see their individual definition.
When a list of threshold is provided, the index will be computed for each
thresholds.
transfer_limit_Mbytes: float
Deprecated, does not have any effect.
callback: Callable[[int], None]
``optional`` Progress bar printing. If ``None``, progress bar will not be
printed.
callback_percentage_start_value: int
``optional`` Initial value of percentage of the progress bar (default: 0).
callback_percentage_total: int
``optional`` Total percentage value (default: 100).
base_period_time_range: list[datetime.datetime ] | list[str] | tuple[str, str] | None
``optional`` Temporal range of the reference period.
The dates can either be given as instance of datetime.datetime or as string
values.
It is used either:
#. to compute percentiles if threshold is filled.
When missing, the studied period is used to compute percentiles.
The study period is either the dataset filtered by `time_range` or the whole
dataset if `time_range` is missing.
For day of year percentiles (doy_per), on extreme percentiles the
overlapping period between `base_period_time_range` and the study period is
bootstrapped.
#. to compute a reference period for indices such as difference_of_mean
(a.k.a anomaly) if a single variable is given in input.
doy_window_width: int
``optional`` Window width used to aggreagte day of year values when computing
day of year percentiles (doy_per)
Default: 5 (5 days).
min_spell_length: int
``optional`` Minimum spell duration to be taken into account when computing
the sum_of_spell_lengths.
rolling_window_width: int
``optional`` Window width of the rolling window for indicators such as
`{max_of_rolling_sum, max_of_rolling_average, min_of_rolling_sum, min_of_rolling_average}`
run_index: str | None
``optional`` The index to use for the run length encoding (e.g. "first", "last", "mid").
Default is "first".
Ignored for non spell indices.
only_leap_years: bool
``optional`` Option for February 29th (default: False).
ignore_Feb29th: bool
``optional`` Ignoring or not February 29th (default: False).
interpolation: str | QuantileInterpolation | None
``optional`` Interpolation method to compute percentile values:
``{"linear", "median_unbiased"}``
Default is "median_unbiased", a.k.a type 8 or method 8.
Ignored for non percentile based indices.
out_unit: str | None
``optional`` Output unit for certain indices: "days" or "%"
(default: "days").
netcdf_version: str | NetcdfVersion
``optional`` NetCDF version to create (default: "NETCDF3_CLASSIC").
user_index: UserIndexDict
``optional`` A dictionary with parameters for user defined index.
**Deprecated**, use generic indices instead.
See :ref:`Custom indices`.
Ignored for ECA&D indices.
save_thresholds: bool
``optional`` True if the thresholds should be saved within the resulting
netcdf file (default: False).
date_event: bool
When True the date of the event (such as when a maximum is reached) will be
stored in coordinates variables.
**warning** This option may significantly slow down computation.
logs_verbosity: str | Verbosity
``optional`` Configure how verbose icclim is.
Possible values: ``{"LOW", "HIGH", "SILENT"}`` (default: "LOW")
sampling_method: str
Choose whether the output sampling configured in `slice_mode` is a
`groupby` operation or a `resample` operation (as per xarray definitions).
Possible values:
``{"groupby", "resample", "groupby_ref_and_resample_study"}``
(default: "resample")
`groupby_ref_and_resample_study` may only be used when computing the
`difference_of_means` (a.k.a the anomaly).
indice_name: str | None
DEPRECATED, use index_name instead.
user_indice: dict | None
DEPRECATED, use user_index instead.
window_width: int
DEPRECATED, use doy_window_width, min_spell_length or rolling_window_width
instead.
save_percentile: bool
DEPRECATED, use save_thresholds instead.
allow_partial_seasons : bool | "start" | "end"
Flag indicating whether to allow partial seasons to be included in the
index calculation.
- True: Unmasks both the first and last periods.
- False: Masks any incomplete periods (standard behavior).
- "start": Unmasks only the first period.
- "end": Unmasks only the last period.
Default is False.
Examples
--------
Compute Summer Days (SU) from an in-memory xarray DataArray:
>>> import numpy as np, pandas as pd, xarray as xr, icclim
>>> time = pd.date_range("2000-01-01", periods=365, freq="D")
>>> tasmax = xr.DataArray(
... np.full(365, 303.15),
... coords={"time": time},
... dims=["time"],
... attrs={"units": "K"},
... )
>>> result = icclim.index(in_files=tasmax, index_name="SU", var_name="tasmax")
>>> int(result["SU"].isel(time=0).values)
365
Compute a generic index with spatially varying seasons:
>>> import numpy as np, pandas as pd, xarray as xr, icclim
>>> time = pd.date_range("2000-01-01", periods=366, freq="D")
>>> tas = xr.DataArray(
... np.full((366, 1, 2), 300.0),
... coords={"time": time, "lat": [45], "lon": [5, 10]},
... dims=["time", "lat", "lon"],
... attrs={"units": "K"},
... )
>>> # Pixel 1: season is JJA (doy 153 to 244 in leap year)
>>> # Pixel 2: season is SON (doy 245 to 335 in leap year)
>>> start = xr.DataArray(
... [[153, 245]], dims=["lat", "lon"], coords={"lat": [45], "lon": [5, 10]}
... )
>>> end = xr.DataArray(
... [[244, 335]], dims=["lat", "lon"], coords={"lat": [45], "lon": [5, 10]}
... )
>>> result = icclim.index(
... in_files=tas,
... index_name="TG",
... slice_mode=(start, end),
... )
>>> # TG is the mean temperature over the season.
>>> # Since all values are 300K (26.85°C), the result should be ~26.85
>>> round(float(result["TG"].isel(time=0, lat=0, lon=0).values), 2)
26.85
Compute an index with an incomplete season at the end (Hellmann style):
>>> import numpy as np, pandas as pd, xarray as xr, icclim
>>> time = pd.date_range("2020-01-01", "2021-12-31", freq="D")
>>> tas = xr.DataArray(
... np.full(len(time), 30.0),
... coords={"time": time},
... dims=["time"],
... attrs={"units": "degC"},
... )
>>> slice_mode = ("season", ("1 november", "31 march"))
>>> result = icclim.index(
... in_files=tas,
... index_name="SU",
... slice_mode=slice_mode,
... allow_partial_seasons=True,
... )
>>> # The last season (2021-11-01 to 2022-03-31) is partial (61 days in 2021)
>>> int(result["SU"].values[-1])
61
"""
_setup(callback, callback_percentage_start_value, logs_verbosity)
(
index_name,
user_index,
save_thresholds,
doy_window_width,
) = _handle_deprecated_params(
index_name,
user_index,
save_thresholds,
indice_name,
transfer_limit_Mbytes,
user_indice,
save_percentile,
window_width,
doy_window_width,
)
del indice_name, transfer_limit_Mbytes, user_indice, save_percentile, window_width
config = _build_config(
in_files=in_files,
index_name=index_name,
var_name=var_name,
slice_mode=slice_mode,
time_range=time_range,
threshold=threshold,
callback=callback,
base_period_time_range=base_period_time_range,
doy_window_width=doy_window_width,
only_leap_years=only_leap_years,
ignore_feb29th=ignore_Feb29th,
interpolation=interpolation,
out_unit=out_unit,
netcdf_version=netcdf_version,
user_index=user_index,
save_thresholds=save_thresholds,
date_event=date_event,
min_spell_length=min_spell_length,
rolling_window_width=rolling_window_width,
sampling_method=sampling_method,
run_index=run_index,
allow_partial_seasons=allow_partial_seasons,
)
result_ds = _compute_climate_index(
climate_index=config.indicator,
config=config,
initial_history=config.climate_variables[0].global_metadata["history"],
initial_source=config.climate_variables[0].global_metadata["source"],
rename=config.rename,
reference=config.reference,
)
if out_file is not None:
_write_output_file(
result_ds,
config.climate_variables[0].global_metadata["time_encoding"],
config.netcdf_version,
out_file,
)
callback(callback_percentage_total)
log.ending_message(time.process_time())
return result_ds
def _build_config(
in_files: InFileLike,
index_name: str | GenericIndicator | StandardIndex | None,
var_name: str | Sequence[str] | None,
slice_mode: FrequencyLike | Frequency,
time_range: Sequence[dt.datetime | str] | None,
threshold: str | Threshold | Sequence[str | Threshold] | None,
callback: Callable[[int], None],
base_period_time_range: Sequence[dt.datetime] | Sequence[str] | None,
doy_window_width: int,
only_leap_years: bool,
ignore_feb29th: bool,
interpolation: str | QuantileInterpolation,
out_unit: str | None,
netcdf_version: str | NetcdfVersion,
user_index: UserIndexDict | None,
save_thresholds: bool,
date_event: bool,
min_spell_length: int | None,
rolling_window_width: int | None,
sampling_method: SamplingMethodLike,
run_index: str | None,
allow_partial_seasons: bool | Literal["start", "end"],
) -> IndexConfig:
if user_index is not None and (index_name is None or isinstance(index_name, str)):
return _build_user_index_config(
user_index,
in_files=in_files,
index_name=index_name,
var_name=var_name,
slice_mode=slice_mode,
time_range=time_range,
callback=callback,
base_period_time_range=base_period_time_range,
doy_window_width=doy_window_width,
only_leap_years=only_leap_years,
ignore_feb29th=ignore_feb29th,
interpolation=interpolation,
out_unit=out_unit,
netcdf_version=netcdf_version,
save_thresholds=save_thresholds,
date_event=date_event,
min_spell_length=min_spell_length,
rolling_window_width=rolling_window_width,
sampling_method=sampling_method,
run_index=run_index,
allow_partial_seasons=allow_partial_seasons,
)
if index_name is not None:
return _build_standard_index_config(
in_files=in_files,
index_name=index_name,
var_name=var_name,
slice_mode=slice_mode,
time_range=time_range,
threshold=threshold,
callback=callback,
base_period_time_range=base_period_time_range,
doy_window_width=doy_window_width,
only_leap_years=only_leap_years,
ignore_feb29th=ignore_feb29th,
interpolation=interpolation,
out_unit=out_unit,
netcdf_version=netcdf_version,
save_thresholds=save_thresholds,
date_event=date_event,
min_spell_length=min_spell_length,
rolling_window_width=rolling_window_width,
sampling_method=sampling_method,
run_index=run_index,
allow_partial_seasons=allow_partial_seasons,
)
msg = "You must fill either index_name or user_indexto compute a climate index."
raise InvalidIcclimArgumentError(msg)
def _get_reference_period(
base_period_time_range: Sequence[dt.datetime | str] | None,
) -> tuple | None:
if base_period_time_range is not None:
return tuple(
(read_date(t).strftime("%m-%d-%Y") for t in base_period_time_range),
)
return None
def _parse_threshold(
threshold: str | Threshold | Sequence[str | Threshold] | None,
doy_window_width: int,
reference_period: Sequence[dt.datetime | str] | None,
only_leap_years: bool,
interpolation: QuantileInterpolation,
) -> Threshold | Sequence[Threshold] | None:
if isinstance(threshold, Threshold):
return threshold
if isinstance(threshold, (str, dict)):
return _build_threshold(
threshold,
doy_window_width=doy_window_width,
reference_period=reference_period,
only_leap_years=only_leap_years,
interpolation=interpolation,
)
if isinstance(threshold, Sequence):
return [
_build_threshold(
t,
doy_window_width=doy_window_width,
reference_period=reference_period,
only_leap_years=only_leap_years,
interpolation=interpolation,
)
for t in threshold
]
return None
def _build_user_index_config(
user_index: UserIndexDict,
in_files: InFileLike,
index_name: str | None,
var_name: str | Sequence[str] | None,
slice_mode: FrequencyLike | Frequency,
time_range: Sequence[dt.datetime | str] | None,
callback: Callable[[int], None],
base_period_time_range: Sequence[dt.datetime] | Sequence[str] | None,
doy_window_width: int,
only_leap_years: bool,
ignore_feb29th: bool,
interpolation: str | QuantileInterpolation,
out_unit: str | None,
netcdf_version: str | NetcdfVersion,
save_thresholds: bool,
date_event: bool,
min_spell_length: int | None,
rolling_window_width: int | None,
sampling_method: SamplingMethodLike,
run_index: str | None,
allow_partial_seasons: bool | Literal["start", "end"],
) -> IndexConfig:
interpolation = QuantileInterpolationRegistry.lookup(interpolation)
indicator = parse.read_indicator(user_index)
sampling_frequency = FrequencyRegistry.lookup(slice_mode) # type: ignore[arg-type]
threshold = parse.read_thresholds(
user_index,
doy_window_width=doy_window_width,
reference_period=base_period_time_range,
only_leap_years=only_leap_years,
interpolation=interpolation,
)
logical_link = parse.read_logical_link(user_index)
coef = parse.read_coef(user_index)
date_event = parse.read_date_event(user_index)
rename = index_name or user_index.get("index_name", None) or "user_index"
output_unit = out_unit
rolling_window_width = user_index.get("window_width", rolling_window_width)
reference_period = _get_reference_period(
user_index.get("ref_time_range", base_period_time_range)
)
climate_vars_dict = build_input_dict(
in_files=in_files,
var_names=var_name,
threshold=threshold,
standard_index=None,
)
is_compared_to_ref = _must_add_reference_var(climate_vars_dict, reference_period)
climate_vars = build_climate_vars(
climate_vars_dict=climate_vars_dict,
ignore_feb29th=ignore_feb29th,
time_range=time_range,
base_period=reference_period,
standard_index=None,
is_compared_to_reference=is_compared_to_ref,
)
return IndexConfig(
save_thresholds=save_thresholds,
frequency=sampling_frequency,
climate_variables=climate_vars,
min_spell_length=min_spell_length,
rolling_window_width=rolling_window_width,
out_unit=output_unit,
netcdf_version=NetcdfVersionRegistry.lookup(netcdf_version),
interpolation=interpolation,
callback=callback,
is_compared_to_reference=is_compared_to_ref,
reference_period=reference_period,
indicator_name=indicator.name,
logical_link=logical_link,
coef=coef,
date_event=date_event,
sampling_method=sampling_method,
rename=rename,
indicator=indicator,
reference=ICCLIM_REFERENCE,
run_index=run_index,
allow_partial_seasons=allow_partial_seasons,
)
def _build_standard_index_config(
in_files: InFileLike,
index_name: str | GenericIndicator | StandardIndex,
var_name: str | Sequence[str] | None,
slice_mode: FrequencyLike | Frequency,
time_range: Sequence[dt.datetime | str] | None,
threshold: str | Threshold | Sequence[str | Threshold] | None,
callback: Callable[[int], None],
base_period_time_range: Sequence[dt.datetime] | Sequence[str] | None,
doy_window_width: int,
only_leap_years: bool,
ignore_feb29th: bool,
interpolation: str | QuantileInterpolation,
out_unit: str | None,
netcdf_version: str | NetcdfVersion,
save_thresholds: bool,
date_event: bool,
min_spell_length: int | None,
rolling_window_width: int | None,
sampling_method: SamplingMethodLike,
run_index: str | None,
allow_partial_seasons: bool | Literal["start", "end"],
) -> IndexConfig:
interpolation = QuantileInterpolationRegistry.lookup(interpolation)
# logical link here link two climate_variable computations as with user_index.
# It isalways AND for standard indices.
logical_link = LogicalLinkRegistry.LOGICAL_AND
sampling_frequency = FrequencyRegistry.lookup(slice_mode) # type: ignore[arg-type]
coef = None
indicator_info = _parse_indicator_config(index_name, out_unit, threshold)
standard_index = indicator_info["standard_index"]
indicator = indicator_info["indicator"]
threshold = indicator_info["threshold"]
rename = indicator_info["rename"]
output_unit = indicator_info["output_unit"]
reference = indicator_info["reference"]
indicator_name = indicator_info["indicator_name"]
reference_period = _get_reference_period(base_period_time_range)
threshold = _parse_threshold(
threshold,
doy_window_width=doy_window_width,
reference_period=reference_period,
only_leap_years=only_leap_years,
interpolation=interpolation,
)
climate_vars_dict = build_input_dict(
in_files=in_files,
var_names=var_name,
threshold=threshold,
standard_index=standard_index,
)
is_compared_to_ref = _must_add_reference_var(climate_vars_dict, reference_period)
climate_vars = build_climate_vars(
climate_vars_dict=climate_vars_dict,
ignore_feb29th=ignore_feb29th,
time_range=time_range,
base_period=reference_period,
standard_index=standard_index,
is_compared_to_reference=is_compared_to_ref,
)
return IndexConfig(
save_thresholds=save_thresholds,
frequency=sampling_frequency,
climate_variables=climate_vars,
min_spell_length=min_spell_length,
rolling_window_width=rolling_window_width,
out_unit=output_unit,
netcdf_version=NetcdfVersionRegistry.lookup(netcdf_version),
interpolation=interpolation,
callback=callback,
is_compared_to_reference=is_compared_to_ref,
reference_period=reference_period,
indicator_name=indicator_name,
logical_link=logical_link,
coef=coef,
date_event=date_event,
sampling_method=sampling_method,
rename=rename,
indicator=indicator,
reference=reference,
run_index=run_index,
allow_partial_seasons=allow_partial_seasons,
)
def _parse_index_kind(
index_name: StandardIndex | GenericIndicator | str,
) -> StandardIndex | GenericIndicator:
if isinstance(index_name, str):
res: StandardIndex | GenericIndicator | None
res = EcadIndexRegistry.lookup_no_error(index_name)
if res is None:
res = DcscIndexRegistry.lookup_no_error(index_name)
if res is None:
res = GenericIndicatorRegistry.lookup(index_name)
return res
return index_name
[docs]
def _write_output_file(
result_ds: xr.Dataset,
input_time_encoding: dict | None,
netcdf_version: NetcdfVersion,
file_path: str,
) -> None:
"""Write `result_ds` to a netCDF file on `out_file` path."""
if input_time_encoding:
time_encoding = {
"calendar": input_time_encoding.get("calendar"),
UNITS_KEY: input_time_encoding.get(UNITS_KEY),
# Force float64 to avoid serialization warning
"dtype": np.float64,
}
else:
time_encoding = {
UNITS_KEY: "days since 1850-1-1",
"dtype": np.float64, # force float
}
result_ds.to_netcdf(
file_path,
format=netcdf_version.name,
encoding={"time": time_encoding},
)
def _handle_deprecated_params(
index_name: str | GenericIndicator | StandardIndex | None,
user_index: UserIndexDict | None,
save_thresholds: bool,
indice_name: str | None,
transfer_limit_mbytes: float | None,
user_indice: UserIndexDict | None,
save_percentile: bool | None,
window_width: int | None,
doy_window_width: int | None,
) -> tuple[str, UserIndexDict, bool, int]:
if indice_name is not None:
log.deprecation_warning(old="indice_name", new="index_name")
index_name = indice_name
if user_indice is not None:
log.deprecation_warning(old="user_indice", new="user_index")
user_index = user_indice
if transfer_limit_mbytes is not None:
log.deprecation_warning(old="transfer_limit_Mbytes")
if save_percentile is not None:
log.deprecation_warning(old="save_percentile", new="save_thresholds")
save_thresholds = save_percentile
if window_width is not None:
log.deprecation_warning(old="window_width", new="doy_window_width")
doy_window_width = window_width
return ( # type: ignore[return-value]
index_name,
user_index,
save_thresholds,
doy_window_width,
)
def _setup(
callback: Callable[[int], None],
callback_start_value: int,
logs_verbosity: Verbosity | str,
) -> None:
import xclim # noqa: PLC0415
# make xclim input daily check a warning instead of an error
# TODO @bzah: it might be safer to feed a context manager which will setup
# and teardown these confs
# https://github.com/cerfacs-globc/icclim/issues/289
xclim.set_options(data_validation="warn")
# keep attributes through xarray operations
xr.set_options(keep_attrs=True)
log.set_verbosity(logs_verbosity)
log.start_message()
callback(callback_start_value)
def _get_unit(output_unit: str | None, da: DataArray) -> str | None:
da_unit = da.attrs.get(UNITS_KEY, None)
if da_unit is None:
if output_unit is None:
warn(
"No unit computed or provided for the index was found."
" Use out_unit parameter to add one.",
stacklevel=2,
)
return ""
return output_unit
return da_unit
def _compute_climate_index(
climate_index: Indicator,
config: IndexConfig,
initial_history: str | None,
initial_source: str | None,
reference: str,
rename: str | None = None,
) -> Dataset:
result_da = climate_index(config)
if rename:
result_da = result_da.rename(rename)
else:
result_da = result_da.rename(climate_index.name)
result_da.attrs[UNITS_KEY] = _get_unit(config.out_unit, result_da)
if (
config.frequency.post_processing is not None
and "time" in result_da.dims
and not isinstance(
climate_index,
(
StandardizedPrecipitationIndex6,
StandardizedPrecipitationIndex3,
),
)
):
resampled_da, time_bounds = config.frequency.post_processing(result_da)
result_ds = resampled_da.to_dataset()
if time_bounds is not None:
result_ds.coords["time_bounds"] = time_bounds
result_ds.time.attrs["bounds"] = "time_bounds"
else:
result_ds = result_da.to_dataset()
if config.save_thresholds:
result_ds = xr.merge(
[result_ds, _format_thresholds_for_export(config.climate_variables)],
)
history = _build_history(result_da, config, initial_history, climate_index)
return _add_ecad_index_metadata(
result_ds,
climate_index,
history,
initial_source,
reference,
)
def _add_ecad_index_metadata(
result_ds: Dataset,
computed_index: Indicator,
history: str,
initial_source: str | None,
reference: str,
) -> Dataset:
result_ds.attrs.update(
{
"title": computed_index.standard_name,
"references": reference,
"institution": "Climate impact portal (https://climate4impact.eu)",
"history": history,
"source": initial_source if initial_source is not None else "",
"Conventions": "CF-1.6",
},
)
try:
result_ds.lat.encoding["_FillValue"] = None
result_ds.lon.encoding["_FillValue"] = None
except AttributeError:
try:
result_ds.latitude.encoding["_FillValue"] = None
result_ds.longitude.encoding["_FillValue"] = None
except AttributeError:
pass
return result_ds
def _build_history(
result_da: DataArray,
config: IndexConfig,
initial_history: str | None,
indice_computed: Indicator,
) -> str:
from icclim import __version__ as icclim_version # noqa: PLC0415
if initial_history is None:
# get xclim history
initial_history = result_da.attrs[HISTORY_CF_KEY]
else:
# append xclim history
initial_history = f"{initial_history}\n{result_da.attrs[HISTORY_CF_KEY]}"
del result_da.attrs[HISTORY_CF_KEY]
current_time = dt.datetime.now(tz=dt.timezone.utc).strftime(
"%Y-%m-%d %H:%M:%S",
)
return (
f"{initial_history}\n"
f" [{current_time}]"
f" Calculation of {indice_computed.name}"
f" index ({config.frequency.adjective})"
f" - icclim version: {icclim_version}"
)
def _build_threshold(
threshold: str | dict | Threshold,
doy_window_width: int,
reference_period: Sequence[dt.datetime | str] | None,
only_leap_years: bool,
interpolation: QuantileInterpolation,
) -> Threshold:
if isinstance(threshold, Threshold):
return threshold
if isinstance(threshold, dict):
kwargs = {
"doy_window_width": doy_window_width,
"reference_period": reference_period,
"only_leap_years": only_leap_years,
"interpolation": interpolation,
**threshold,
}
return build_threshold(**kwargs)
return build_threshold(
threshold,
doy_window_width=doy_window_width,
reference_period=reference_period,
only_leap_years=only_leap_years,
interpolation=interpolation,
)
def _format_thresholds_for_export(climate_vars: list[ClimateVariable]) -> Dataset:
return xr.merge(
[v for v in (_format_threshold(v) for v in climate_vars) if v is not None],
)
def _format_threshold(cf_var: ClimateVariable) -> DataArray | None:
if cf_var.threshold is not None and cf_var.threshold.value is not None:
val = cf_var.threshold.value
if isinstance(val, xr.DataArray):
return val.rename(cf_var.name + "_thresholds").reindex() # type: ignore[return-value]
if isinstance(val, xr.Dataset):
return val.rename(
dict.fromkeys(val.data_vars, cf_var.name + "_thresholds")
).reindex() # type: ignore[arg-type]
return None
[docs]
def _must_add_reference_var(
climate_vars_dict: dict[str, InFileDictionary],
reference_period: Sequence[str] | None,
) -> bool:
"""
Check if the reference variable must be added to the input variables.
Return True whenever the input has no threshold and only one studied variable but
there is a reference period.
Example case: the anomaly of tx(1960-2100) by tx(1960-1990).
"""
t = next(iter(climate_vars_dict.values())).get("thresholds", None)
return t is None and len(climate_vars_dict) == 1 and reference_period is not None
def _rename_coords(ds: Dataset, short_name: str) -> Dataset:
if "percentiles" in ds.coords:
ds = ds.rename({"percentiles": short_name + "_percentiles"})
if "thresholds" in ds.coords:
ds = ds.rename({"thresholds": short_name + "_thresholds"})
return ds
def _drop_group_auxiliary_vars(ds: Dataset) -> Dataset:
return ds.drop_vars(NUMBER_OF_NOTNULL_VAR, errors="ignore")
def _parse_indicator_config(
index_name: str | GenericIndicator | StandardIndex,
out_unit: str | None,
threshold: str | Threshold | Sequence[str | Threshold] | None,
) -> dict[str, Any]:
index = _parse_index_kind(index_name)
if isinstance(index, StandardIndex):
standard_index = index.clone()
indicator = standard_index.indicator.clone()
return {
"standard_index": standard_index,
"indicator": indicator,
"threshold": threshold
if threshold is not None
else standard_index.threshold,
"rename": standard_index.short_name,
"output_unit": out_unit or standard_index.output_unit,
"reference": standard_index.reference,
"indicator_name": standard_index.short_name,
}
if isinstance(index, GenericIndicator):
indicator = index.clone()
return {
"standard_index": None,
"indicator": indicator,
"threshold": threshold,
"rename": None,
"output_unit": out_unit,
"reference": ICCLIM_REFERENCE,
"indicator_name": indicator.name,
}
err = f"Unknown index_name : `{index_name}`"
raise InvalidIcclimArgumentError(err)