"""Contain the GenericIndicator class."""
from __future__ import annotations
import contextlib
from copy import deepcopy
from functools import reduce
from typing import TYPE_CHECKING, Any, Callable
import numpy as np
import xarray as xr
from jinja2 import Environment
from pint import DefinitionSyntaxError, UndefinedUnitError
from xarray import DataArray
from xclim.core.calendar import select_time
from xclim.core.cfchecks import cfcheck_from_name
from xclim.core.datachecks import check_freq
from xclim.core.options import MISSING_METHODS, MISSING_OPTIONS, OPTIONS
from xclim.core.units import (
convert_units_to,
rate2amount,
units2pint,
)
from xclim.core.units import units as xc_units
from icclim._core.climate_variable import must_run_bootstrap
from icclim._core.constants import (
RESAMPLE_METHOD,
UNITS_KEY,
)
from icclim._core.generic.generic_templates import INDICATORS_TEMPLATES_EN
from icclim._core.model.indicator import Indicator
from icclim.exception import InvalidIcclimArgumentError
if TYPE_CHECKING:
import jinja2
from icclim._core.climate_variable import ClimateVariable
from icclim._core.model.index_config import IndexConfig
from icclim._core.model.indicator import MissingMethodLike
from icclim.frequency import Frequency
jinja_env = Environment(autoescape=True)
[docs]
class GenericIndicator(Indicator):
"""
GenericIndicator are climate indicators wich are not specific to a particular domain.
They can be computed from any climate variable and are combined with `Threshold` objects
to create personalized indicators.
Parameters
----------
name: str
The name of the indicator.
process: Callable[..., DataArray]
The function that processes the indicator.
definition: str
The definition of the indicator.
check_vars: Callable[[list[ClimateVariable], GenericIndicator], None], optional
A function that checks if the variables meet the indicator requirements.
Defaults to None.
sampling_methods: list[str], optional
A list of sampling methods that can be used with the indicator.
Defaults to None.
missing: str, optional
The method for handling missing values. Defaults to "from_context".
missing_options: dict, optional
Additional options for handling missing values. Defaults to None.
qualifiers: tuple, optional
Additional qualifiers for the indicator. Defaults to ().
Attributes
----------
missing: str
The method for handling missing values.
missing_options: dict | None
Additional options for handling missing values.
""" # noqa: E501
[docs]
missing_options: dict | None
def __init__(
self,
name: str,
process: Callable[..., DataArray],
definition: str,
check_vars: (
Callable[[list[ClimateVariable], GenericIndicator], None] | None
) = None,
sampling_methods: list[str] | None = None,
missing: str = "from_context",
missing_options: dict | None = None,
qualifiers: tuple = (),
) -> None:
"""
Initialize a GenericIndicator object.
Parameters
----------
name : str
The name of the indicator.
process : Callable[..., DataArray]
The processing function of the indicator.
definition : str
A definition for the indicator.
check_vars : Callable[[list[ClimateVariable], GenericIndicator], None] | None, optional
A function that checks the variables used by the indicator, by default None.
sampling_methods : list[str] | None, optional
The sampling methods used by the indicator, by default None.
missing : str, optional
The method for handling missing values, by default "from_context".
missing_options : Any, optional
The options for handling missing values, by default None.
qualifiers : tuple, optional
The qualifiers for the indicator, by default ().
Raises
------
ValueError
If `missing_options` is set with `missing` method being from context.
Notes
-----
See the `GenericIndicatorRegistry` class for a list of available indicators.
Examples
--------
>>> from icclim.generic_indices import GenericIndicator
>>> def process(climate_vars, resample_freq):
... out = climate_vars[0].studied_data + climate_vars[1].studied_data
... out.resample(time=resample_freq).mean()
... return out
>>> def check_vars(climate_vars, indicator):
... if len(climate_vars) != 2:
... raise ValueError(
... "This indicator requires exactly 2 climate variables."
... )
>>> indicator = GenericIndicator(
... name="test",
... process=process,
... definition="This is a test indicator",
... check_vars=check_vars,
... sampling_methods=["daily"],
... missing="skip",
... missing_options=None,
... qualifiers=(),
... )
""" # noqa: E501
super().__init__()
self.missing_options = missing_options
self.missing = missing
if self.missing == "from_context" and self.missing_options is not None:
err = (
"Cannot set `missing_options` with `missing` method being from context."
)
raise ValueError(err)
missing_method: MissingMethodLike = MISSING_METHODS[self.missing]
self._missing = missing_method.execute
if self.missing_options:
missing_method.validate(**self.missing_options)
en_indicator_templates = deepcopy(INDICATORS_TEMPLATES_EN[name])
self.name = name
self.process = process
self.standard_name = en_indicator_templates["standard_name"]
self.cell_methods = en_indicator_templates["cell_methods"]
self.long_name = en_indicator_templates["long_name"]
self.check_vars = check_vars
self.definition = definition
self.qualifiers = qualifiers
self.sampling_methods = (
sampling_methods if sampling_methods is not None else [RESAMPLE_METHOD]
)
[docs]
def preprocess(
self,
climate_vars: list[ClimateVariable],
jinja_scope: dict[str, Any],
output_frequency: Frequency,
src_freq: Frequency,
output_unit: str | None,
coef: float | None,
sampling_method: str,
) -> list[ClimateVariable]:
"""
Preprocesses the climate variables before computing the indicator.
Parameters
----------
climate_vars : list[ClimateVariable]
The list of climate variables to be preprocessed.
jinja_scope : dict[str, Any]
The Jinja scope used for formatting the template.
output_frequency : Frequency
The desired frequency of the output.
src_freq : Frequency
The source frequency of the climate variables.
output_unit : str | None
The desired output unit of the indicator. If None, no unit conversion is
performed.
coef : float | None
The coefficient to multiply the climate variable data with. If None,
no multiplication is performed.
sampling_method : str
The sampling method used for some specific indicators.
See `difference_of_means` for example.
Returns
-------
list[ClimateVariable]
The preprocessed climate variables.
"""
self._check_for_invalid_setup(climate_vars, sampling_method)
if output_unit is not None:
if _is_amount_unit(output_unit):
climate_vars = _convert_rates_to_amounts(
climate_vars=climate_vars,
output_unit=output_unit,
)
elif _is_a_diff_indicator(self) and output_unit != "%":
# [gh:255] Indicators computing the difference between two
# variables must first convert the units of input variables
# to the expected output unit in order to avoid converting
# the output of the difference.
# This is because a difference of relative units is not equivalent
# to a difference of absolute on scale units.
# In other words: a 15 Kelvin difference *is* equivalent
# to a 15 degC difference, but if we would convert the unit after
# computing the difference, we could get -258.15 degC from the
# 15 Kelvin.
for climate_var in climate_vars:
climate_var.studied_data = convert_units_to(
climate_var.studied_data,
target=output_unit,
)
if coef is not None:
for climate_var in climate_vars:
climate_var.studied_data = coef * climate_var.studied_data
if output_frequency.indexer:
for climate_var in climate_vars:
climate_var.studied_data = select_time(
climate_var.studied_data,
**output_frequency.indexer,
drop=True,
)
_check_data(climate_vars, src_freq.pandas_freq)
_check_cf(climate_vars)
self._format_template(jinja_scope=jinja_scope)
return climate_vars
[docs]
def postprocess(
self,
result: DataArray,
climate_vars: list[ClimateVariable],
output_freq: str,
src_freq: str,
indexer: dict,
out_unit: str | None,
) -> DataArray:
"""
Postprocesses the result of the indicator computation.
Parameters
----------
result : DataArray
The result of the indicator computation.
climate_vars : list[ClimateVariable]
The list of climate variables used for the computation.
output_freq : str
The desired output frequency of the postprocessed result.
src_freq : str
The source frequency of the input data.
indexer : dict
The indexer used to subset the input data.
out_unit : str | None
The desired output unit of the postprocessed result.
If None, no unit conversion is performed.
Returns
-------
DataArray
The postprocessed result.
"""
if out_unit is not None:
result = convert_units_to(result, out_unit, context="hydro")
if self.missing != "skip" and indexer is not None:
# reference variable is a subset of the studied variable,
# so no need to check it.
das = filter(lambda cv: not cv.is_reference, climate_vars)
das = (cv.studied_data for cv in das)
das = list(das)
if "time" in result.dims:
result = self._handle_missing_values(
resample_freq=output_freq,
src_freq=src_freq,
indexer=indexer,
in_data=das,
out_data=result,
)
for prop in self.templated_properties:
result.attrs[prop] = getattr(self, prop)
result.attrs["history"] = ""
return result
def __call__(self, config: IndexConfig) -> DataArray:
"""
Compute the indicator based on the given configuration.
Parameters
----------
config : IndexConfig
The configuration object containing the settings for computing the
indicator.
Returns
-------
DataArray
The computed indicator as a DataArray.
"""
src_freq = config.climate_variables[0].source_frequency
base_jinja_scope = {
"np": np,
"enumerate": enumerate,
"len": len,
"output_freq": config.frequency,
"source_freq": src_freq,
}
climate_vars_meta = _get_climate_vars_metadata(
config.climate_variables,
src_freq,
base_jinja_scope,
jinja_env,
)
jinja_scope: dict[str, Any] = {
"min_spell_length": config.min_spell_length,
"rolling_window_width": config.rolling_window_width,
"climate_vars": climate_vars_meta,
"is_compared_to_reference": config.is_compared_to_reference,
"reference_period": config.reference_period,
}
jinja_scope.update(base_jinja_scope)
climate_vars = self.preprocess(
climate_vars=config.climate_variables,
jinja_scope=jinja_scope,
output_frequency=config.frequency,
src_freq=src_freq,
output_unit=config.out_unit,
coef=config.coef,
sampling_method=config.sampling_method,
)
result = self.process(
climate_vars=climate_vars,
resample_freq=config.frequency,
min_spell_length=config.min_spell_length,
rolling_window_width=config.rolling_window_width,
group_by_freq=config.frequency.group_by_key,
is_compared_to_reference=config.is_compared_to_reference,
logical_link=config.logical_link,
date_event=config.date_event,
source_freq_delta=src_freq.delta,
to_percent=config.out_unit == "%",
sampling_method=config.sampling_method,
)
return self.postprocess(
result,
climate_vars=climate_vars,
output_freq=config.frequency.pandas_freq,
src_freq=src_freq.pandas_freq,
indexer=config.frequency.indexer,
out_unit=config.out_unit,
)
def __eq__(self, other: object) -> bool:
"""
Check if two GenericIndicator objects are equal.
Parameters
----------
other : Any
The object to compare with.
Returns
-------
bool
True if the two objects are equal, False otherwise.
"""
return (
isinstance(other, GenericIndicator)
and self.long_name == other.long_name
and self.standard_name == other.standard_name
and self.process == other.process
)
def __str__(self) -> str:
"""
Return the name of the indicator.
Returns
-------
str
The name of the indicator.
"""
return self.name
def _check_for_invalid_setup(
self,
climate_vars: list[ClimateVariable],
sampling_method: str,
) -> None:
if not _same_freq_for_all(climate_vars):
msg = (
"All variables must have the same time frequency (for example daily) to"
" be compared with each others, but this was not the case."
)
raise InvalidIcclimArgumentError(msg)
if sampling_method not in self.sampling_methods:
msg = (
f"{self.name} can only be computed with the following"
f" sampling_method(s): {self.sampling_methods}"
)
raise InvalidIcclimArgumentError(msg)
if self.check_vars is not None:
# Run indicator specific check method
self.check_vars(climate_vars, self)
def _format_template(self, jinja_scope: dict) -> None:
for templated_property in self.templated_properties:
template = jinja_env.from_string(
getattr(self, templated_property),
globals=jinja_scope,
)
setattr(self, templated_property, template.render())
def _handle_missing_values(
self,
in_data: list[DataArray],
resample_freq: str,
src_freq: str,
indexer: dict | None,
out_data: DataArray,
) -> DataArray:
options = self.missing_options or OPTIONS[MISSING_OPTIONS].get(self.missing, {})
# We flag periods according to the missing method. skip variables without a time
# coordinate.
missing_method: MissingMethodLike = MISSING_METHODS[self.missing] # typing
miss = (
missing_method.execute(da, resample_freq, src_freq, options, indexer)
for da in in_data
if "time" in da.coords
)
# Reduce by or and broadcast to ensure the same length in time
# When indexing is used and there are no valid points in the last period,
# mask will not include it
mask = reduce(np.logical_or, miss) # typing
if isinstance(mask, DataArray) and mask.time.size < out_data.time.size:
mask = mask.reindex(time=out_data.time, fill_value=True)
return out_data.where(~mask)
def _same_freq_for_all(climate_vars: list[ClimateVariable]) -> bool:
if len(climate_vars) == 1:
return True
freqs = [xr.infer_freq(a.studied_data.time) for a in climate_vars]
return all(x == freqs[0] for x in freqs[1:])
def _get_climate_vars_metadata(
climate_vars: list[ClimateVariable],
resample_freq: Frequency,
jinja_scope: dict[str, Any],
jinja_env: jinja2.Environment,
) -> list[dict[str, str]]:
return [
c_var.build_indicator_metadata(
resample_freq,
must_run_bootstrap(c_var.studied_data, c_var.threshold),
jinja_scope,
jinja_env,
)
for c_var in climate_vars
]
def _convert_rates_to_amounts(
climate_vars: list[ClimateVariable], output_unit: str
) -> list[ClimateVariable]:
for climate_var in climate_vars:
current_unit = climate_var.studied_data.attrs.get(UNITS_KEY, None)
if current_unit is not None and not _is_amount_unit(current_unit):
with xc_units.context("hydro"):
climate_var.studied_data = rate2amount(
climate_var.studied_data,
out_units=output_unit,
)
return climate_vars
def _is_amount_unit(unit: str) -> bool:
try:
u = units2pint(unit) # turn a cf u
return xc_units.Quantity(1, u).check("[length]")
except (UndefinedUnitError, DefinitionSyntaxError):
return False
[docs]
def _check_cf(climate_vars: list[ClimateVariable]) -> None:
"""Compare metadata attributes to CF-Convention standards.
Default cfchecks use the specifications in `xclim.core.utils.VARIABLES`,
assuming the indicator's inputs are using the CMIP6/xclim variable names
correctly.
Variables absent from these default specs are silently ignored.
When subclassing this method, use functions decorated using
`xclim.core.options.cfcheck`.
"""
for da in climate_vars:
with contextlib.suppress(KeyError):
# Silently ignore unknown variables.
cfcheck_from_name(str(da.name), da)
def _check_data(climate_vars: list[ClimateVariable], src_freq: str) -> None:
if src_freq is None:
return
for climate_var in climate_vars:
da = climate_var.studied_data
if "time" in da.coords and da.time.ndim == 1 and len(da.time) > 3:
check_freq(da, src_freq, strict=True)
def _is_a_diff_indicator(indicator: Indicator) -> bool:
return "compute_diff" in indicator.qualifiers