Source code for icclim.threshold.factory

"""Factory to build a `Threshold` from a query or from its components."""

from __future__ import annotations

import re
from collections.abc import Sequence
from typing import TYPE_CHECKING

from pint.errors import UndefinedUnitError
from xarray import DataArray, Dataset
from xclim.core.units import units as xc_units

from icclim._core.constants import (
    DOY_PERCENTILE_UNIT,
    PERIOD_PERCENTILE_UNIT,
)
from icclim._core.generic.threshold.basic import BasicThreshold
from icclim._core.generic.threshold.bounded import BoundedThreshold
from icclim._core.generic.threshold.percentile import PercentileThreshold
from icclim._core.input_parsing import (
    PercentileDataArray,
    find_standard_vars,
    get_name_of_first_var,
    is_dataset_path,
    read_dataset,
)
from icclim._core.model.logical_link import LogicalLink, LogicalLinkRegistry
from icclim._core.model.operator import Operator, OperatorRegistry
from icclim._core.model.threshold import (
    Threshold,
    ThresholdBuilderInput,
    ThresholdValueType,
)
from icclim._core.utils import is_number_sequence
from icclim.exception import InvalidIcclimArgumentError

if TYPE_CHECKING:
    import pint

VALUE_REGEX = re.compile(r"-?\d+\.?\d*")
OPERAND_REGEX = re.compile(r"[<>=]")


[docs] def build_threshold( query: str | None = None, *, operator: Operator | str | None = None, value: ThresholdValueType = None, unit: str | None = None, threshold_min_value: str | float | pint.Quantity | None = None, thresholds: Sequence[Threshold | str] | None = None, logical_link: str | LogicalLink | None = None, offset: str | float | pint.Quantity | None = None, **kwargs, ) -> BoundedThreshold | PercentileThreshold | BasicThreshold: """ Build a `Threshold`. This function is a factory for `Threshold` instances. It can build a `BasicThreshold`, a `PercentileThreshold` or a `BoundedThreshold`. See :ref:`generic_indices_recipes` for how to combine thresholds with generic indices. Parameters ---------- query: str | None = None string query describing a threshold. It must include: an operator, a threshold value and optionally a unit such as "> 10 degC". It acts as a shorthand for ``operator``, ``value`` and ``unit`` parameters for simple threshold. Don't use ``query`` when value is a DataArray, a Dataset or a path to a netcdf/zarr storage, instead use ``operator``, ``value`` and ``unit``. ``query`` supersede `operator`, `value` and `unit` parameters. operator: Operator | str = None keyword argument only. The operator either as an instance of Operator or as a compatible string. See :py:class:`OperatorRegistry` for the list of all operators. When ``query`` is None and operator is None, the default ``Operator.REACH`` is used. value: str | float | int | Dataset | DataArray | Sequence[float | int | str] | None keyword argument only. The threshold value(s), default to None. It can be: * a simple scalar threshold * a percentile that will be computed per-grid cell (in combinaison with `unit`) * per-grid cell thresholds defined by a DataArray, a Dataset or a string path to a netcdf/zarr. * a sequence of scalars, the indicator will then be computed for each value and a specific dimension will be created (also work with percentiles). unit: str | None = None Keyword argument only. The threshold unit. When ``unit`` is None, if ``value`` is a dataset and a "units" can be read from its `attrs`, this unit will be used. If value is a scalar or a sequence of scalar, the exceedance will be computed by assuming threshold has the same unit as the studied value is it compared to. When unit is a string it must be a valid unit of our shared pint registry with xclim or a special percentile unit: * "doy_per" for day of year percentiles (in ECAD, used for temperature indices such as TX90p) * "period_per" for per period percentiles (in ECAD, used for rain indices such as R75p) threshold_min_value: str | float | pint.Quantity A minimum value used to pre-filter computed threshold values. This is particularly useful to compute a percentile threshold for rain where dry days are usually ignored. In that case threshold_min_value would be set to "1 mm/day". If threshold_min_value is a number, ``unit`` is used to quantify ``threshold_min_value``. offset: float | None Optional. An offset applied to the threshold. This is particularly useful when the threshold is an existing dataset (netcdf file or zarr store) and the data should be compared to this dataset + an offset (e.g. to compute days when T > 5 degC above normal) kwargs Additional arguments to build a PercentileThreshold. See :py:class:`PercentileThreshold` constructor for the complete list of possible arguments. Examples -------- .. code-block:: python # -- Scalar threshold scalar_t = build_threshold(">= 30 degC") assert isinstance(scalar_t, BasicThreshold) # -- Daily percentile threshold doy_t = build_threshold(">= 30 doy_per") assert isinstance(doy_t, PercentileThreshold) # -- Per grid-cell threshold, with offset grided_t = build_threshold( operator=">=", value="path/to/tasmax_thresholds.nc", unit="K", offset=5 ) assert isinstance(grided_t, BasicThreshold) # -- Daily percentile threshold, from a file tasmax = xarray.open_dataset("path/to/tasmax_thresholds.nc").tasmax doys = xclim.core.calendar.percentile_doy(tasmax) doy_file_t = build_threshold(operator=">=", value=doys) assert isinstance(doy_file_t, PercentileThreshold) # -- Bounded threshold bounded_t = build_threshold(">= -20 degree AND <= 20 degree ") # equivalent to: x = build_threshold(">= -20 degree") y = build_threshold("<= 20 degree") bounded_t2 = x & y assert bounded_t == bounded_t2 # equivalent to: bounded_t3 = build_threshold(thresholds=[x, y], logical_link="AND") assert bounded_t == bounded_t3 assert isinstance(bounded_t, BoundedThreshold) """ input_thresh = _read_input( query, operator, value, unit, threshold_min_value, thresholds, logical_link, offset, **kwargs, ) if _must_build_per_threshold(input_thresh): return PercentileThreshold(**input_thresh) if _must_build_basic_threshold(input_thresh): return BasicThreshold(**input_thresh) if _must_build_bounded_threshold(input_thresh): return BoundedThreshold(**input_thresh) if _must_build_per_grid_cell_threshold(input_thresh): return BasicThreshold(**input_thresh) msg = f"Threshold cannot be built from a {type(value)}" raise NotImplementedError(msg)
def _get_operator(query: str) -> tuple[Operator | None, str]: operand = "".join(OPERAND_REGEX.findall(query)) op = OperatorRegistry.lookup_no_error(operand) if op is not None: return op, query.replace(op.operand, "", 1) return None, query def _read_string_threshold(query: str) -> tuple[str, str, float]: op, no_op_query = _get_operator(query) operand = op.operand if op else "" if DOY_PERCENTILE_UNIT in no_op_query: val = re.findall(VALUE_REGEX, no_op_query)[0] unit = DOY_PERCENTILE_UNIT elif PERIOD_PERCENTILE_UNIT in no_op_query: val = re.findall(VALUE_REGEX, no_op_query)[0] unit = PERIOD_PERCENTILE_UNIT else: try: quantity = xc_units.Quantity(no_op_query) except UndefinedUnitError as e: msg = f"Could not build threshold from {query}" raise InvalidIcclimArgumentError(msg) from e val = quantity.m unit = None if quantity.unitless else str(quantity.units) return operand, unit, val def _build_quantity( quantity: None | str | float | pint.Quantity, default_unit: str | None, ) -> pint.Quantity | None: if quantity is None: return None if isinstance(quantity, xc_units.Quantity): return quantity if isinstance(quantity, (float, int)): if default_unit in (PERIOD_PERCENTILE_UNIT, DOY_PERCENTILE_UNIT): unit = None else: unit = default_unit return xc_units.Quantity(value=quantity, units=unit) if isinstance(quantity, str): operator, unit, value = _read_string_threshold(quantity) if operator is not None and operator != "": msg = ( f"Cannot parse quantity '{quantity}'" f" The operator {operator} should be removed." ) raise InvalidIcclimArgumentError(msg) return xc_units.Quantity(value=value, units=unit) msg = f"Unknown type '{type(quantity)}' for quantity {quantity}" raise NotImplementedError(msg) def _read_input( query: str | None = None, operator: Operator | str | None = None, value: ThresholdValueType = None, unit: str | None = None, threshold_min_value: str | float | None = None, thresholds: tuple[Threshold, Threshold] | None = None, logical_link: str | LogicalLink | None = None, offset: str | float | pint.Quantity | None = None, **kwargs, ) -> ThresholdBuilderInput: if query is not None and _must_read_query(query, operator, value, unit): if _is_bounded_threshold_query(query): return _read_bounded_threshold_query(query) return _read_threshold_from_query(query, threshold_min_value, kwargs) if _must_read_bounded(operator, value, unit, thresholds, logical_link): return _read_bounded_threshold(thresholds, logical_link) if operator is not None: if (operator := OperatorRegistry.lookup_no_error(operator)) is None: operator = OperatorRegistry.REACH return { "operator": operator, "unit": unit, "value": value, "threshold_min_value": _build_quantity(threshold_min_value, unit), "offset": _build_quantity(offset, unit), **kwargs, } msg = "Could not read threshold" raise NotImplementedError(msg) def _read_bounded_threshold( thresholds: tuple[Threshold, Threshold], logical_link: LogicalLink | str, ) -> ThresholdBuilderInput: acc = [] for t in thresholds: if isinstance(t, str): acc.append(_read_input(t)) elif isinstance(t, Threshold): acc.append(t) else: msg = f"Unknown type '{type(t)}'" raise NotImplementedError(msg) if len(acc) > 2: msg = "Can't build BoundedThreshold on more than 2 thresholds." raise NotImplementedError(msg) if isinstance(logical_link, str): logical_link = LogicalLinkRegistry.lookup(logical_link) return { "initial_query": None, "thresholds": tuple(acc), "logical_link": logical_link, } def _read_threshold_from_query( query: str, threshold_min_value: None | str | float | pint.Quantity, kwargs: dict, ) -> ThresholdBuilderInput: operator, unit, value = _read_string_threshold(query) if (operator := OperatorRegistry.lookup_no_error(operator)) is None: operator = OperatorRegistry.REACH return { "operator": operator, "unit": unit, "value": value, "threshold_min_value": _build_quantity(threshold_min_value, unit), "initial_query": query, **kwargs, } def _must_read_query( query: str, operator: Operator | str | None, value: ThresholdValueType | None, unit: str | None, ) -> bool: return ( isinstance(query, str) and operator is None and value is None and unit is None ) def _must_read_bounded( operator: Operator | str | None, value: ThresholdValueType, unit: str | None, thresholds: tuple[Threshold, Threshold] | None, logical_link: str | LogicalLink | None, ) -> bool: return ( operator is None and value is None and unit is None and thresholds is not None and logical_link is not None ) def _is_bounded_threshold_query(query: str) -> bool: return any( l_l.name.upper() in query.upper() for l_l in LogicalLinkRegistry.values() ) def _read_bounded_threshold_query(query: str) -> ThresholdBuilderInput: link = None split_word = None uppered = query.upper() for l_l in LogicalLinkRegistry.values(): if l_l.name.upper() in uppered: link = l_l index_of_link = uppered.index(l_l.name.upper()) split_word = query[index_of_link : index_of_link + len(l_l.name)] break if link is None: msg = f"No logical link found in {query}" raise InvalidIcclimArgumentError(msg) threshs = [t.strip() for t in query.split(split_word)] if len(threshs) != 2: msg = ( "BoundedThreshold can only be built on 2" f" thresholds. We found {len(threshs)} here." ) raise InvalidIcclimArgumentError(msg) return { "initial_query": query, "thresholds": (_read_input(threshs[0]), _read_input(threshs[1])), "logical_link": link, } def _must_build_per_threshold(builder_input: ThresholdBuilderInput) -> bool: value = builder_input.get("value") unit = builder_input.get("unit", None) var_name = builder_input.get("threshold_var_name", None) return _has_per_unit(unit, value) or _is_per_dataset(var_name, value) def _is_per_dataset(threshold_var_name: str, value: str | Dataset | DataArray) -> bool: if is_dataset_path(value) or isinstance(value, Dataset): thresh_da = _get_dataarray_from_dataset(threshold_var_name, value) else: thresh_da = value return PercentileDataArray.is_compatible(thresh_da) def _has_per_unit(unit: str | None | Sequence[float], value: float) -> bool: return isinstance(value, (float, Sequence)) and ( unit in (DOY_PERCENTILE_UNIT, PERIOD_PERCENTILE_UNIT) ) def _must_build_per_grid_cell_threshold(builder_input: ThresholdBuilderInput) -> bool: value = builder_input.get("value") threshold_var_name = builder_input.get("threshold_var_name", None) if value is None: # Case of comparison to normal like dcsc::TxND, # the threshold is setup but ::prepare must be called # to initialize the value return True if is_dataset_path(value) or isinstance(value, Dataset): thresh_da = _get_dataarray_from_dataset(threshold_var_name, value) return not PercentileDataArray.is_compatible(thresh_da) return False def _must_build_basic_threshold(builder_input: ThresholdBuilderInput) -> bool: value = builder_input.get("value") return is_number_sequence(value) or isinstance(value, (DataArray, float, int)) def _must_build_bounded_threshold(builder_input: ThresholdBuilderInput) -> bool: logical_link = builder_input.get("logical_link") return logical_link is not None def _get_dataarray_from_dataset( threshold_var_name: str | None, value: Dataset | str, ) -> DataArray: ds = value if isinstance(value, Dataset) else read_dataset(value, standard_var=None) if threshold_var_name is None: if len(ds.data_vars) == 1: threshold_var_name = get_name_of_first_var(ds) else: names = find_standard_vars(ds) if len(names) == 1: threshold_var_name = names[0] else: msg = ( f"Could not guess the variable to use as a threshold in {ds}." f" Use `threshold_var_name` to specify which variable should be" f" used." ) raise InvalidIcclimArgumentError(msg) return ds[threshold_var_name]