# -*- Coding: latin-1 -*-
# Copyright CERFACS (http://cerfacs.fr/)
# Apache License, Version 2.0 (http://www.apache.org/licenses/LICENSE-2.0)
"""
Main entry point of icclim.
This module expose icclim principal function, notably `index` which is use by the
generated API.
A convenience function `indices` is also exposed to compute multiple indices at once.
"""
from __future__ import annotations
import datetime as dt
import operator
import time
from collections.abc import Sequence
from functools import reduce
from typing import TYPE_CHECKING, Callable
from warnings import warn
import xarray as xr
import xclim
from icclim._core.climate_variable import (
ClimateVariable,
build_climate_vars,
)
from icclim._core.constants import (
RESAMPLE_METHOD,
UNITS_KEY,
)
from icclim._core.generic.indicator import GenericIndicator
from icclim._core.input_parsing import build_input_dict
from icclim._core.legacy.user_index import parse
from icclim._core.model.index_config import IndexConfig
from icclim._core.model.index_group import IndexGroup, IndexGroupRegistry
from icclim._core.model.logical_link import LogicalLinkRegistry
from icclim._core.model.netcdf_version import NetcdfVersion, NetcdfVersionRegistry
from icclim._core.model.quantile_interpolation import (
QuantileInterpolation,
QuantileInterpolationRegistry,
)
from icclim._core.model.standard_index import StandardIndex
from icclim._core.model.threshold import Threshold
from icclim._core.utils import read_date
from icclim.dcsc.registry import DcscIndexRegistry
from icclim.ecad.binding import (
StandardizedPrecipitationIndex3,
StandardizedPrecipitationIndex6,
)
from icclim.ecad.registry import EcadIndexRegistry
from icclim.exception import InvalidIcclimArgumentError
from icclim.frequency import Frequency, FrequencyRegistry
from icclim.generic.registry import GenericIndicatorRegistry
from icclim.logger import IcclimLogger, Verbosity, VerbosityRegistry
from icclim.threshold.factory import build_threshold
if TYPE_CHECKING:
from xarray.core.dataarray import DataArray
from xarray.core.dataset import Dataset
from icclim._core.legacy.user_index.model import UserIndexDict
from icclim._core.model.icclim_types import (
FrequencyLike,
InFileLike,
SamplingMethodLike,
)
from icclim._core.model.in_file_dictionary import InFileDictionary
from icclim._core.model.indicator import Indicator
log: IcclimLogger = IcclimLogger.get_instance(VerbosityRegistry.LOW)
HISTORY_CF_KEY = "history"
SOURCE_CF_KEY = "source"
ICCLIM_REFERENCE = "icclim"
[docs]
def indices(
index_group: Sequence[str] | str | IndexGroup | StandardIndex,
*,
ignore_error: bool = False,
**kwargs,
) -> Dataset:
"""
Compute multiple indices at the same time.
The input dataset(s) must include all the necessary variables.
It can only be used with keyword arguments (kwargs).
Parameters
----------
index_group : "all" | str | IndexGroup | list[str]
Either the name of an IndexGroup or an instance of IndexGroup or a list
of index short names or the name(s) of standard variable(s) (such as 'tasmax').
The value "all" can also be used to compute every indices.
Note that the input given by ``in_files`` must include all the necessary
variables to compute the indices of this group.
ignore_error: bool
When True, ignore indices that fails to compute. This is option is particularly
useful when used with `index_group='all'` to compute everything that can be
computed given the input.
kwargs : Dict
``icclim.index`` keyword arguments.
Returns
-------
xr.Dataset
A Dataset with one data variable per index.
Notes
-----
If ``output_file`` is part of kwargs, the result is written in a single netCDF
file, which will contain all the index results of this group.
"""
indices = _get_ecad_indices_of_group(index_group)
out = None
if "out_file" in kwargs:
out = kwargs["out_file"]
del kwargs["out_file"]
acc = []
for i in indices:
log.info("Computing index %s", i.short_name)
kwargs["index_name"] = i.short_name
if ignore_error:
try:
res = index(**kwargs)
if "percentiles" in res.coords:
res = res.rename({"percentiles": i.short_name + "_percentiles"})
if "thresholds" in res.coords:
res = res.rename({"thresholds": i.short_name + "_thresholds"})
acc.append(res)
except Exception: # noqa: BLE001 (catch everything)
warn(f"Could not compute {i.short_name}.", stacklevel=2)
else:
res = index(**kwargs)
if "percentiles" in res.coords:
res = res.rename({"percentiles": i.short_name + "_percentiles"})
if "thresholds" in res.coords:
res = res.rename({"thresholds": i.short_name + "_thresholds"})
acc.append(res)
ds: Dataset = xr.merge(acc)
if out is not None:
_write_output_file(
result_ds=ds,
input_time_encoding=ds.time.encoding,
netcdf_version=kwargs.get("netcdf_version", NetcdfVersionRegistry.NETCDF4),
file_path=out,
)
return ds
def _get_ecad_indices_of_group(
query: Sequence[str] | str | IndexGroup | StandardIndex,
) -> list[StandardIndex]:
if query == IndexGroupRegistry.WILD_CARD_GROUP or (
isinstance(query, str)
and query.lower() == IndexGroupRegistry.WILD_CARD_GROUP.name
):
# case of group="all"
return EcadIndexRegistry.values()
if not isinstance(query, (list, tuple)):
query = [query]
# -- Look for standard indices (e.g. index_group='tx90p')
indices = [EcadIndexRegistry.lookup_no_error(i) for i in query]
indices = list(filter(lambda x: x is not None, indices))
if len(indices) == len(query):
return indices
# -- Look for variables in standard indices (e.g. index_group='tasmax')
indices = []
for ecad_index in EcadIndexRegistry.values():
has_var = True
for var in ecad_index.input_variables:
is_query_in_aliases = (
standard_var in var.aliases for standard_var in query
)
has_var &= any(is_query_in_aliases)
if has_var:
indices.append(ecad_index)
if len(indices) >= len(query):
return indices
# -- Look for index group (e.g. index_group='HEAT')
groups = [IndexGroupRegistry.lookup_no_error(i) for i in query]
groups = list(filter(lambda x: x is not None, groups))
indices = (x.get_indices() for x in groups)
indices = reduce(operator.add, indices, []) # flatten list[list]
if len(indices) >= len(query):
return indices
msg = f"The index group {query} was not recognized."
raise InvalidIcclimArgumentError(msg)
[docs]
def indice(*args, **kwargs) -> Dataset:
"""
Proxy for `icclim.index` function.
Deprecated: to be deleted in a future release.
"""
log.deprecation_warning(old="icclim.indice", new="icclim.index")
return index(*args, **kwargs)
[docs]
def index(
in_files: InFileLike,
index_name: str
| GenericIndicator
| StandardIndex
| None = None, # optional when computing user_indices
var_name: str | Sequence[str] | None = None,
slice_mode: FrequencyLike | Frequency = "year",
time_range: Sequence[dt.datetime | str] | None = None,
out_file: str | None = None,
threshold: str | Threshold | Sequence[str | Threshold] | None = None,
callback: Callable[[int], None] = log.callback,
callback_percentage_start_value: int = 0,
callback_percentage_total: int = 100,
base_period_time_range: Sequence[dt.datetime] | Sequence[str] | None = None,
doy_window_width: int = 5,
only_leap_years: bool = False,
ignore_Feb29th: bool = False, # noqa: N803
interpolation: str | QuantileInterpolation = "median_unbiased",
out_unit: str | None = None,
netcdf_version: str | NetcdfVersion = "NETCDF4",
user_index: UserIndexDict | None = None,
save_thresholds: bool = False,
logs_verbosity: Verbosity | str = "LOW",
date_event: bool = False,
min_spell_length: int | None = 6,
rolling_window_width: int | None = 5,
sampling_method: SamplingMethodLike = RESAMPLE_METHOD,
*,
# deprecated params are kwargs only
window_width: int | None = None,
save_percentile: bool | None = None,
indice_name: str | None = None,
user_indice: UserIndexDict | None = None,
transfer_limit_Mbytes: float | None = None, # noqa: N803
) -> Dataset:
"""
Compute climate index.
This is the main entry point for icclim.
Parameters
----------
in_files: str | list[str] | Dataset | DataArray | InputDictionary
Absolute path(s) to NetCDF dataset(s), including OPeNDAP URLs,
or path to zarr store, or xarray.Dataset or xarray.DataArray.
index_name: str | StandardIndex
Climate index name.
For ECA&D index, case insensitive name used to lookup the index.
For user index, it's the name of the output variable.
var_name: str | list[str] | None
``optional`` Target variable name to process corresponding to ``in_files``.
If None (default) on ECA&D index, the variable is guessed based on the
climate index wanted.
Mandatory for a user index.
slice_mode: FrequencyLike | Frequency
Type of temporal aggregation:
The possibles values are ``{"year", "month", "DJF", "MAM", "JJA", "SON",
"ONDJFM" or "AMJJAS", ("season", [1,2,3]), ("month", [1,2,3,])}``
(where season and month lists can be customized) or any valid pandas
frequency.
A season can also be defined between two exact dates:
``("season", ("19 july", "14 august"))``.
Default is "year".
See :ref:`slice_mode` for details.
time_range: list[datetime.datetime ] | list[str] | tuple[str, str] | None
``optional`` Temporal range: upper and lower bounds for temporal subsetting.
If ``None``, whole period of input files will be processed.
The dates can either be given as instance of datetime.datetime or as string
values. For strings, many format are accepted.
Default is ``None``.
out_file: str | None
Output NetCDF file name (default: "icclim_out.nc" in the current directory).
Default is "icclim_out.nc".
If the input ``in_files`` is a ``Dataset``, ``out_file`` field is ignored.
Use the function returned value instead to retrieve the computed value.
If ``out_file`` already exists, icclim will overwrite it!
threshold: float | list[float] | None
``optional`` User defined threshold for certain indices.
Default depend on the index, see their individual definition.
When a list of threshold is provided, the index will be computed for each
thresholds.
transfer_limit_Mbytes: float
Deprecated, does not have any effect.
callback: Callable[[int], None]
``optional`` Progress bar printing. If ``None``, progress bar will not be
printed.
callback_percentage_start_value: int
``optional`` Initial value of percentage of the progress bar (default: 0).
callback_percentage_total: int
``optional`` Total percentage value (default: 100).
base_period_time_range: list[datetime.datetime ] | list[str] | tuple[str, str] | None
``optional`` Temporal range of the reference period.
The dates can either be given as instance of datetime.datetime or as string
values.
It is used either:
#. to compute percentiles if threshold is filled.
When missing, the studied period is used to compute percentiles.
The study period is either the dataset filtered by `time_range` or the whole
dataset if `time_range` is missing.
For day of year percentiles (doy_per), on extreme percentiles the
overlapping period between `base_period_time_range` and the study period is
bootstrapped.
#. to compute a reference period for indices such as difference_of_mean
(a.k.a anomaly) if a single variable is given in input.
doy_window_width: int
``optional`` Window width used to aggreagte day of year values when computing
day of year percentiles (doy_per)
Default: 5 (5 days).
min_spell_length: int
``optional`` Minimum spell duration to be taken into account when computing
the sum_of_spell_lengths.
rolling_window_width: int
``optional`` Window width of the rolling window for indicators such as
`{max_of_rolling_sum, max_of_rolling_average, min_of_rolling_sum, min_of_rolling_average}`
only_leap_years: bool
``optional`` Option for February 29th (default: False).
ignore_Feb29th: bool
``optional`` Ignoring or not February 29th (default: False).
interpolation: str | QuantileInterpolation | None
``optional`` Interpolation method to compute percentile values:
``{"linear", "median_unbiased"}``
Default is "median_unbiased", a.k.a type 8 or method 8.
Ignored for non percentile based indices.
out_unit: str | None
``optional`` Output unit for certain indices: "days" or "%"
(default: "days").
netcdf_version: str | NetcdfVersion
``optional`` NetCDF version to create (default: "NETCDF3_CLASSIC").
user_index: UserIndexDict
``optional`` A dictionary with parameters for user defined index.
See :ref:`Custom indices`.
Ignored for ECA&D indices.
save_thresholds: bool
``optional`` True if the thresholds should be saved within the resulting
netcdf file (default: False).
date_event: bool
When True the date of the event (such as when a maximum is reached) will be
stored in coordinates variables.
**warning** This option may significantly slow down computation.
logs_verbosity: str | Verbosity
``optional`` Configure how verbose icclim is.
Possible values: ``{"LOW", "HIGH", "SILENT"}`` (default: "LOW")
sampling_method: str
Choose whether the output sampling configured in `slice_mode` is a
`groupby` operation or a `resample` operation (as per xarray definitions).
Possible values:
``{"groupby", "resample", "groupby_ref_and_resample_study"}``
(default: "resample")
`groupby_ref_and_resample_study` may only be used when computing the
`difference_of_means` (a.k.a the anomaly).
indice_name: str | None
DEPRECATED, use index_name instead.
user_indice: dict | None
DEPRECATED, use user_index instead.
window_width: int
DEPRECATED, use doy_window_width, min_spell_length or rolling_window_width
instead.
save_percentile: bool
DEPRECATED, use save_thresholds instead.
""" # noqa: E501
_setup(callback, callback_percentage_start_value, logs_verbosity)
(
index_name,
user_index,
save_thresholds,
doy_window_width,
) = _handle_deprecated_params(
index_name,
user_index,
save_thresholds,
indice_name,
transfer_limit_Mbytes,
user_indice,
save_percentile,
window_width,
doy_window_width,
)
del indice_name, transfer_limit_Mbytes, user_indice, save_percentile, window_width
config = _build_config(
in_files=in_files,
index_name=index_name,
var_name=var_name,
slice_mode=slice_mode,
time_range=time_range,
threshold=threshold,
callback=callback,
base_period_time_range=base_period_time_range,
doy_window_width=doy_window_width,
only_leap_years=only_leap_years,
ignore_Feb29th=ignore_Feb29th,
interpolation=interpolation,
out_unit=out_unit,
netcdf_version=netcdf_version,
user_index=user_index,
save_thresholds=save_thresholds,
date_event=date_event,
min_spell_length=min_spell_length,
rolling_window_width=rolling_window_width,
sampling_method=sampling_method,
)
result_ds = _compute_climate_index(
climate_index=config.indicator,
config=config,
initial_history=config.climate_variables[0].global_metadata["history"],
initial_source=config.climate_variables[0].global_metadata["source"],
rename=config.rename,
reference=config.reference,
)
if out_file is not None:
_write_output_file(
result_ds,
config.climate_variables[0].global_metadata["time_encoding"],
config.netcdf_version,
out_file,
)
callback(callback_percentage_total)
log.ending_message(time.process_time())
return result_ds
def _build_config(
in_files: InFileLike,
index_name: str | GenericIndicator | StandardIndex | None,
var_name: str | Sequence[str] | None,
slice_mode: FrequencyLike | Frequency,
time_range: Sequence[dt.datetime | str] | None,
threshold: str | Threshold | Sequence[str | Threshold] | None,
callback: Callable[[int], None],
base_period_time_range: Sequence[dt.datetime] | Sequence[str] | None,
doy_window_width: int,
only_leap_years: bool,
ignore_Feb29th: bool, # noqa: N803
interpolation: str | QuantileInterpolation,
out_unit: str | None,
netcdf_version: str | NetcdfVersion,
user_index: UserIndexDict | None,
save_thresholds: bool,
date_event: bool,
min_spell_length: int | None,
rolling_window_width: int | None,
sampling_method: SamplingMethodLike,
) -> IndexConfig:
if user_index is not None and (index_name is None or isinstance(index_name, str)):
return _build_user_index_config(
user_index,
in_files=in_files,
index_name=index_name,
var_name=var_name,
slice_mode=slice_mode,
time_range=time_range,
callback=callback,
base_period_time_range=base_period_time_range,
doy_window_width=doy_window_width,
only_leap_years=only_leap_years,
ignore_Feb29th=ignore_Feb29th,
interpolation=interpolation,
out_unit=out_unit,
netcdf_version=netcdf_version,
save_thresholds=save_thresholds,
date_event=date_event,
min_spell_length=min_spell_length,
rolling_window_width=rolling_window_width,
sampling_method=sampling_method,
)
if index_name is not None:
return _build_standard_index_config(
in_files=in_files,
index_name=index_name,
var_name=var_name,
slice_mode=slice_mode,
time_range=time_range,
threshold=threshold,
callback=callback,
base_period_time_range=base_period_time_range,
doy_window_width=doy_window_width,
only_leap_years=only_leap_years,
ignore_Feb29th=ignore_Feb29th,
interpolation=interpolation,
out_unit=out_unit,
netcdf_version=netcdf_version,
save_thresholds=save_thresholds,
date_event=date_event,
min_spell_length=min_spell_length,
rolling_window_width=rolling_window_width,
sampling_method=sampling_method,
)
msg = "You must fill either index_name or user_index" "to compute a climate index."
raise InvalidIcclimArgumentError(msg)
def _get_reference_period(
base_period_time_range: Sequence[dt.datetime | str] | None,
) -> tuple | None:
if base_period_time_range is not None:
return tuple(
(read_date(t).strftime("%m-%d-%Y") for t in base_period_time_range),
)
return None
def _parse_threshold(
threshold: str | Threshold | Sequence[str | Threshold] | None,
doy_window_width: int,
reference_period: Sequence[dt.datetime | str] | None,
only_leap_years: bool,
interpolation: QuantileInterpolation,
) -> Threshold | Sequence[Threshold] | None:
if isinstance(threshold, Threshold):
return threshold
if isinstance(threshold, str):
return _build_threshold(
threshold,
doy_window_width=doy_window_width,
reference_period=reference_period,
only_leap_years=only_leap_years,
interpolation=interpolation,
)
if isinstance(threshold, Sequence):
return [
_build_threshold(
t,
doy_window_width=doy_window_width,
reference_period=reference_period,
only_leap_years=only_leap_years,
interpolation=interpolation,
)
for t in threshold
]
return None
def _build_user_index_config(
user_index: UserIndexDict,
in_files: InFileLike,
index_name: str | None,
var_name: str | Sequence[str] | None,
slice_mode: FrequencyLike | Frequency,
time_range: Sequence[dt.datetime | str] | None,
callback: Callable[[int], None],
base_period_time_range: Sequence[dt.datetime] | Sequence[str] | None,
doy_window_width: int,
only_leap_years: bool,
ignore_Feb29th: bool, # noqa: N803
interpolation: str | QuantileInterpolation,
out_unit: str | None,
netcdf_version: str | NetcdfVersion,
save_thresholds: bool,
date_event: bool,
min_spell_length: int | None,
rolling_window_width: int | None,
sampling_method: SamplingMethodLike,
) -> IndexConfig:
interpolation = QuantileInterpolationRegistry.lookup(interpolation)
indicator = parse.read_indicator(user_index)
sampling_frequency = FrequencyRegistry.lookup(slice_mode)
threshold = parse.read_thresholds(
user_index,
doy_window_width=doy_window_width,
reference_period=base_period_time_range,
only_leap_years=only_leap_years,
interpolation=interpolation,
)
logical_link = parse.read_logical_link(user_index)
coef = parse.read_coef(user_index)
date_event = parse.read_date_event(user_index)
rename = index_name or user_index.get("index_name", None) or "user_index"
output_unit = out_unit
rolling_window_width = user_index.get("window_width", rolling_window_width)
reference_period = _get_reference_period(
user_index.get("ref_time_range", base_period_time_range)
)
climate_vars_dict = build_input_dict(
in_files=in_files,
var_names=var_name,
threshold=threshold,
standard_index=None,
)
is_compared_to_ref = _must_add_reference_var(climate_vars_dict, reference_period)
climate_vars = build_climate_vars(
climate_vars_dict=climate_vars_dict,
ignore_Feb29th=ignore_Feb29th,
time_range=time_range,
base_period=reference_period,
standard_index=None,
is_compared_to_reference=is_compared_to_ref,
)
return IndexConfig(
save_thresholds=save_thresholds,
frequency=sampling_frequency,
climate_variables=climate_vars,
min_spell_length=min_spell_length,
rolling_window_width=rolling_window_width,
out_unit=output_unit,
netcdf_version=NetcdfVersionRegistry.lookup(netcdf_version),
interpolation=interpolation,
callback=callback,
is_compared_to_reference=is_compared_to_ref,
reference_period=reference_period,
indicator_name=indicator.name,
logical_link=logical_link,
coef=coef,
date_event=date_event,
sampling_method=sampling_method,
rename=rename,
indicator=indicator,
reference=ICCLIM_REFERENCE,
)
def _build_standard_index_config(
in_files: InFileLike,
index_name: str | GenericIndicator | StandardIndex,
var_name: str | Sequence[str] | None,
slice_mode: FrequencyLike | Frequency,
time_range: Sequence[dt.datetime | str] | None,
threshold: str | Threshold | Sequence[str | Threshold] | None,
callback: Callable[[int], None],
base_period_time_range: Sequence[dt.datetime] | Sequence[str] | None,
doy_window_width: int,
only_leap_years: bool,
ignore_Feb29th: bool, # noqa: N803
interpolation: str | QuantileInterpolation,
out_unit: str | None,
netcdf_version: str | NetcdfVersion,
save_thresholds: bool,
date_event: bool,
min_spell_length: int | None,
rolling_window_width: int | None,
sampling_method: SamplingMethodLike,
) -> IndexConfig:
interpolation = QuantileInterpolationRegistry.lookup(interpolation)
# logical link here link two climate_variable computations as with user_index.
# It isalways AND for standard indices.
logical_link = LogicalLinkRegistry.LOGICAL_AND
sampling_frequency = FrequencyRegistry.lookup(slice_mode)
coef = None
index = _parse_index_kind(index_name)
if isinstance(index, StandardIndex):
standard_index = index.clone()
indicator = standard_index.indicator.clone()
threshold = standard_index.threshold
rename = standard_index.short_name
output_unit = out_unit or standard_index.output_unit
reference = standard_index.reference
indicator_name = standard_index.short_name
elif isinstance(index, GenericIndicator):
rename = None
output_unit = out_unit
standard_index = None
indicator = index.clone()
reference = ICCLIM_REFERENCE
indicator_name = indicator.name
else:
err = f"Unknown index_name : `{index_name}`"
raise InvalidIcclimArgumentError(err)
reference_period = _get_reference_period(base_period_time_range)
threshold = _parse_threshold(
threshold,
doy_window_width=doy_window_width,
reference_period=reference_period,
only_leap_years=only_leap_years,
interpolation=interpolation,
)
climate_vars_dict = build_input_dict(
in_files=in_files,
var_names=var_name,
threshold=threshold,
standard_index=standard_index,
)
is_compared_to_ref = _must_add_reference_var(climate_vars_dict, reference_period)
climate_vars = build_climate_vars(
climate_vars_dict=climate_vars_dict,
ignore_Feb29th=ignore_Feb29th,
time_range=time_range,
base_period=reference_period,
standard_index=standard_index,
is_compared_to_reference=is_compared_to_ref,
)
return IndexConfig(
save_thresholds=save_thresholds,
frequency=sampling_frequency,
climate_variables=climate_vars,
min_spell_length=min_spell_length,
rolling_window_width=rolling_window_width,
out_unit=output_unit,
netcdf_version=NetcdfVersionRegistry.lookup(netcdf_version),
interpolation=interpolation,
callback=callback,
is_compared_to_reference=is_compared_to_ref,
reference_period=reference_period,
indicator_name=indicator_name,
logical_link=logical_link,
coef=coef,
date_event=date_event,
sampling_method=sampling_method,
rename=rename,
indicator=indicator,
reference=reference,
)
def _parse_index_kind(
index_name: StandardIndex | GenericIndicator | str,
) -> StandardIndex | GenericIndicator:
if isinstance(index_name, str):
index = EcadIndexRegistry.lookup_no_error(index_name)
if index is None:
index = DcscIndexRegistry.lookup_no_error(index_name)
if index is None:
index = GenericIndicatorRegistry.lookup(index_name)
return index
return index_name
[docs]
def _write_output_file(
result_ds: xr.Dataset,
input_time_encoding: dict | None,
netcdf_version: NetcdfVersion,
file_path: str,
) -> None:
"""Write `result_ds` to a netCDF file on `out_file` path."""
if input_time_encoding:
time_encoding = {
"calendar": input_time_encoding.get("calendar"),
UNITS_KEY: input_time_encoding.get(UNITS_KEY),
"dtype": input_time_encoding.get("dtype"),
}
else:
time_encoding = {UNITS_KEY: "days since 1850-1-1"}
result_ds.to_netcdf(
file_path,
format=netcdf_version.name,
encoding={"time": time_encoding},
)
def _handle_deprecated_params(
index_name: str | GenericIndicator | StandardIndex | None,
user_index: UserIndexDict | None,
save_thresholds: bool,
indice_name: str | None,
transfer_limit_Mbytes: float | None, # noqa: N803
user_indice: UserIndexDict | None,
save_percentile: bool | None,
window_width: int | None,
doy_window_width: int | None,
) -> tuple[str, UserIndexDict, bool, int]:
if indice_name is not None:
log.deprecation_warning(old="indice_name", new="index_name")
index_name = indice_name
if user_indice is not None:
log.deprecation_warning(old="user_indice", new="user_index")
user_index = user_indice
if transfer_limit_Mbytes is not None:
log.deprecation_warning(old="transfer_limit_Mbytes")
if save_percentile is not None:
log.deprecation_warning(old="save_percentile", new="save_thresholds")
save_thresholds = save_percentile
if window_width is not None:
log.deprecation_warning(old="window_width", new="doy_window_width")
doy_window_width = window_width
return index_name, user_index, save_thresholds, doy_window_width
def _setup(
callback: Callable[[int], None],
callback_start_value: int,
logs_verbosity: Verbosity | str,
) -> None:
# make xclim input daily check a warning instead of an error
# TODO @bzah: it might be safer to feed a context manager which will setup
# and teardown these confs
# https://github.com/cerfacs-globc/icclim/issues/289
xclim.set_options(data_validation="warn")
# keep attributes through xarray operations
xr.set_options(keep_attrs=True)
log.set_verbosity(logs_verbosity)
log.start_message()
callback(callback_start_value)
def _get_unit(output_unit: str | None, da: DataArray) -> str | None:
da_unit = da.attrs.get(UNITS_KEY, None)
if da_unit is None:
if output_unit is None:
warn(
"No unit computed or provided for the index was found."
" Use out_unit parameter to add one.",
stacklevel=2,
)
return ""
return output_unit
return da_unit
def _compute_climate_index(
climate_index: Indicator,
config: IndexConfig,
initial_history: str | None,
initial_source: str | None,
reference: str,
rename: str | None = None,
) -> Dataset:
result_da = climate_index(config)
if rename:
result_da = result_da.rename(rename)
else:
result_da = result_da.rename(climate_index.name)
result_da.attrs[UNITS_KEY] = _get_unit(config.out_unit, result_da)
if (
config.frequency.post_processing is not None
and "time" in result_da.dims
and not isinstance(
climate_index,
(
StandardizedPrecipitationIndex6,
StandardizedPrecipitationIndex3,
),
)
):
resampled_da, time_bounds = config.frequency.post_processing(result_da)
result_ds = resampled_da.to_dataset()
if time_bounds is not None:
result_ds["time_bounds"] = time_bounds
result_ds.time.attrs["bounds"] = "time_bounds"
else:
result_ds = result_da.to_dataset()
if config.save_thresholds:
result_ds = xr.merge(
[result_ds, _format_thresholds_for_export(config.climate_variables)],
)
history = _build_history(result_da, config, initial_history, climate_index)
return _add_ecad_index_metadata(
result_ds,
climate_index,
history,
initial_source,
reference,
)
def _add_ecad_index_metadata(
result_ds: Dataset,
computed_index: Indicator,
history: str,
initial_source: str | None,
reference: str,
) -> Dataset:
result_ds.attrs.update(
{
"title": computed_index.standard_name,
"references": reference,
"institution": "Climate impact portal (https://climate4impact.eu)",
"history": history,
"source": initial_source if initial_source is not None else "",
"Conventions": "CF-1.6",
},
)
try:
result_ds.lat.encoding["_FillValue"] = None
result_ds.lon.encoding["_FillValue"] = None
except AttributeError:
try:
result_ds.latitude.encoding["_FillValue"] = None
result_ds.longitude.encoding["_FillValue"] = None
except AttributeError:
pass
return result_ds
def _build_history(
result_da: DataArray,
config: IndexConfig,
initial_history: str | None,
indice_computed: Indicator,
) -> str:
from icclim import __version__ as icclim_version
if initial_history is None:
# get xclim history
initial_history = result_da.attrs[HISTORY_CF_KEY]
else:
# append xclim history
initial_history = f"{initial_history}\n{result_da.attrs[HISTORY_CF_KEY]}"
del result_da.attrs[HISTORY_CF_KEY]
current_time = dt.datetime.now(tz=dt.timezone.utc).strftime(
"%Y-%m-%d %H:%M:%S",
)
return (
f"{initial_history}\n"
f" [{current_time}]"
f" Calculation of {indice_computed.name}"
f" index ({config.frequency.adjective})"
f" - icclim version: {icclim_version}"
)
def _build_threshold(
threshold: str | Threshold,
doy_window_width: int,
reference_period: Sequence[dt.datetime | str] | None,
only_leap_years: bool,
interpolation: QuantileInterpolation,
) -> Threshold:
if isinstance(threshold, Threshold):
return threshold
return build_threshold(
threshold,
doy_window_width=doy_window_width,
reference_period=reference_period,
only_leap_years=only_leap_years,
interpolation=interpolation,
)
def _format_thresholds_for_export(climate_vars: list[ClimateVariable]) -> Dataset:
return xr.merge([_format_threshold(v) for v in climate_vars])
def _format_threshold(cf_var: ClimateVariable) -> DataArray:
return cf_var.threshold.value.rename(cf_var.name + "_thresholds").reindex()
[docs]
def _must_add_reference_var(
climate_vars_dict: dict[str, InFileDictionary],
reference_period: Sequence[str] | None,
) -> bool:
"""
Check if the reference variable must be added to the input variables.
Return True whenever the input has no threshold and only one studied variable but
there is a reference period.
Example case: the anomaly of tx(1960-2100) by tx(1960-1990).
"""
t = next(iter(climate_vars_dict.values())).get("thresholds", None)
return t is None and len(climate_vars_dict) == 1 and reference_period is not None