"""
Implementations of the generic indices computation methods.
These functions are not meant to be called directly, they are used by the
`GenericIndicatorRegistry` class to create the generic indices.
Each function is a reducer that takes a list of `ClimateVariable` instances and returns
a `DataArray` instance.
The `ClimateVariable` instances are used to extract the data and the thresholds needed
to compute the generic index.
The `DataArray` instance is the result of the computation of the generic index.
.. note::
You can call the respective generic index from icclim module, for example:
`icclim.count_occurrences(...)`.
"""
from __future__ import annotations
import operator
from functools import partial
from typing import TYPE_CHECKING, Callable
from warnings import warn
import xarray as xr
from xarray import DataArray
from xarray.core.resample import DataArrayResample
from xarray.core.rolling import DataArrayRolling
from xclim.core.calendar import build_climatology_bounds
from xclim.core.units import (
convert_units_to,
rate2amount,
str2pint,
to_agg_units,
)
from xclim.indices import run_length
from icclim._core.climate_variable import must_run_bootstrap
from icclim._core.constants import (
GROUP_BY_METHOD,
GROUP_BY_REF_AND_RESAMPLE_STUDY_METHOD,
PART_OF_A_WHOLE_UNIT,
REFERENCE_PERIOD_ID,
RESAMPLE_METHOD,
UNITS_KEY,
)
from icclim._core.input_parsing import PercentileDataArray
from icclim._core.model.cf_calendar import CfCalendarRegistry
from icclim._core.model.operator import OperatorRegistry
from icclim.exception import InvalidIcclimArgumentError
from icclim.frequency import RUN_INDEXER, Frequency, FrequencyRegistry
if TYPE_CHECKING:
from datetime import timedelta
import numpy as np
from pint import Quantity
from xarray.core.groupby import DataArrayGroupBy
from icclim._core.climate_variable import ClimateVariable
from icclim._core.generic.threshold.percentile import PercentileThreshold
from icclim._core.model.logical_link import LogicalLink
from icclim._core.model.threshold import Threshold
[docs]
def count_occurrences(
climate_vars: list[ClimateVariable],
resample_freq: Frequency,
logical_link: LogicalLink,
date_event: bool,
to_percent: bool,
**kwargs, # noqa: ARG001
) -> DataArray:
"""
Count the occurrences of exceedances of the threshold(s).
Parameters
----------
climate_vars : list[ClimateVariable]
The list of climate variables containing the data and the threshold(s).
resample_freq : Frequency
The time frequency of the output.
logical_link : LogicalLink
The logical link to apply to the exceedances if multiple thresholds are
provided.
date_event : bool
Whether to return the date of the event.
to_percent : bool
Whether to return the result in percent.
**kwargs : dict
Ignored keyword arguments (for compatibility).
Returns
-------
DataArray
The count of occurrences of exceedances.
"""
if date_event:
reducer_op = _count_occurrences_with_date
else:
reducer_op = partial(DataArrayResample.sum, dim="time")
merged_exceedances = _compute_exceedances(
climate_vars,
resample_freq.pandas_freq,
logical_link,
)
result = reducer_op(merged_exceedances.resample(time=resample_freq.pandas_freq))
if to_percent:
result = _to_percent(result, resample_freq)
result.attrs[UNITS_KEY] = "%"
return result
return to_agg_units(result, climate_vars[0].studied_data, "count")
[docs]
def max_consecutive_occurrence(
climate_vars: list[ClimateVariable],
resample_freq: Frequency,
logical_link: LogicalLink,
date_event: bool,
source_freq_delta: timedelta,
**kwargs, # noqa: ARG001
) -> DataArray:
"""
Calculate the maximum number of consecutive occurrences of exceedances for a given set of climate variables.
Parameters
----------
climate_vars : list[ClimateVariable]
The list of climate variables containing the data and the threshold(s).
resample_freq : Frequency
The time frequency of the output.
logical_link : LogicalLink
The logical link to apply when merging the exceedances.
date_event : bool
Whether to include the dates of the exceedances in the result.
source_freq_delta : timedelta
The time difference between consecutive data points in the source data.
**kwargs : dict
Ignored keyword arguments (for compatibility).
Returns
-------
DataArray
The maximum number of consecutive occurrences of exceedances.
""" # noqa: E501
merged_exceedances = _compute_exceedances(
climate_vars,
resample_freq.pandas_freq,
logical_link,
)
rle = run_length.rle(merged_exceedances, dim="time", index="first")
resampled = rle.resample(time=resample_freq.pandas_freq)
if date_event:
result = _consecutive_occurrences_with_dates(resampled, source_freq_delta)
else:
result = resampled.max(dim="time")
return to_agg_units(result, climate_vars[0].studied_data, "count")
[docs]
def sum_of_spell_lengths(
climate_vars: list[ClimateVariable],
resample_freq: Frequency,
logical_link: LogicalLink,
min_spell_length: int,
**kwargs, # noqa: ARG001
) -> DataArray:
"""
Calculate the sum of the lengths of all spells in the data.
This function calculates the sum of the lengths of all spells in the data,
where a spell is defined as a consecutive occurrence of exceedances.
Parameters
----------
climate_vars : list[ClimateVariable]
The list of climate variables containing the data and the threshold(s).
resample_freq : Frequency
The time frequency of the output.
logical_link : LogicalLink
The logical link to apply when merging the exceedances.
min_spell_length : int
The minimum length of a spell to consider.
**kwargs : dict
Ignored keyword arguments (for compatibility).
Returns
-------
DataArray
The sum of the lengths of all spells in the data.
"""
merged_exceedances = _compute_exceedances(
climate_vars,
resample_freq.pandas_freq,
logical_link,
)
rle = run_length.rle(merged_exceedances, dim="time", index="first")
cropped_rle = rle.where(rle >= min_spell_length, other=0)
result = cropped_rle.resample(time=resample_freq.pandas_freq).max(dim="time")
return to_agg_units(result, climate_vars[0].studied_data, "count")
[docs]
def excess(
climate_vars: list[ClimateVariable],
resample_freq: Frequency,
**kwargs, # noqa: ARG001
) -> DataArray:
"""
Compute the excess of a climate variable above a threshold using the 'reach' operator.
Parameters
----------
climate_vars : list[ClimateVariable]
List of climate variables. Only the first variable is used.
resample_freq : Frequency
The time frequency of the output.
**kwargs : dict
Ignored keyword arguments (for compatibility).
Returns
-------
DataArray
DataArray containing the computed excess values.
Raises
------
InvalidIcclimArgumentError
If the threshold operator is not 'reach'.
Notes
-----
The excess is computed by subtracting the threshold from the climate variable data.
Only the values above the threshold are considered, and negative values are set to
zero.
The resulting excess values are then summed over the specified resample frequency.
""" # noqa: E501
study, threshold = _get_thresholded_var(climate_vars)
if threshold.operator is not OperatorRegistry.REACH:
msg = "Excess can only be computed with 'reach' operator."
raise InvalidIcclimArgumentError(msg)
excesses = threshold.compute(study, override_op=operator.sub)
res = (
(excesses).clip(min=0).resample(time=resample_freq.pandas_freq).sum(dim="time")
)
res = res.assign_attrs(units=f"delta_{res.attrs['units']}")
return to_agg_units(res, study, "integral")
[docs]
def deficit(
climate_vars: list[ClimateVariable],
resample_freq: Frequency,
**kwargs, # noqa: ARG001
) -> DataArray:
"""
Compute the deficit of a climate variable below a threshold using the 'reach' operator.
Parameters
----------
climate_vars : list[ClimateVariable]
List of climate variables. Only the first variable is used.
resample_freq : Frequency
The time frequency of the output.
**kwargs : dict
Ignored keyword arguments (for compatibility).
Returns
-------
DataArray
DataArray containing the computed deficit values.
Notes
-----
The deficit is computed by subtracting the climate variable data from the threshold.
Only the values below the threshold are considered, and negative values are set to
zero.
The resulting deficit values are then summed over the specified resample frequency.
""" # noqa: E501
study, threshold = get_single_var(climate_vars)
deficit = threshold.compute(study, override_op=lambda da, th: th - da)
res = deficit.clip(min=0).resample(time=resample_freq.pandas_freq).sum(dim="time")
res = res.assign_attrs(units=f"delta_{res.attrs['units']}")
return to_agg_units(res, study, "integral")
[docs]
def fraction_of_total(
climate_vars: list[ClimateVariable],
resample_freq: Frequency,
to_percent: bool,
**kwargs, # noqa: ARG001
) -> DataArray:
"""
Calculate the fraction of total for a given set of climate variables.
Parameters
----------
climate_vars : list[ClimateVariable]
The list of climate variables containing the data and the threshold(s).
Only one variable is expected in the list.
resample_freq : Frequency
The resampling frequency.
to_percent : bool
Flag indicating whether to convert the result to percentage.
**kwargs : dict
Ignored keyword arguments (for compatibility).
Returns
-------
DataArray
The fraction of total as a DataArray.
Notes
-----
This function calculates the fraction of total for a given set of climate variables.
The fraction of total is calculated by dividing the sum of values exceeding a
threshold by the total sum of values.
If the `to_percent` flag is set to True, the result will be multiplied by 100 and
the units will be set to "%". Otherwise, the units will be set to the value of
PART_OF_A_WHOLE_UNIT, which is 1.
"""
study, threshold = get_single_var(climate_vars)
if threshold.threshold_min_value is not None:
min_val = threshold.threshold_min_value
min_val = convert_units_to(min_val, study, context="hydro")
total = (
study.where(threshold.operator(study, min_val))
.resample(time=resample_freq.pandas_freq)
.sum(dim="time")
)
else:
total = study.resample(time=resample_freq.pandas_freq).sum(dim="time")
exceedance = _compute_exceedance(
study=study,
threshold=threshold,
freq=resample_freq.pandas_freq,
bootstrap=must_run_bootstrap(study, threshold),
).squeeze()
over = (
study.where(exceedance, 0)
.resample(time=resample_freq.pandas_freq)
.sum(dim="time")
)
res = over / total
if to_percent:
res = res * 100
res.attrs[UNITS_KEY] = "%"
else:
res.attrs[UNITS_KEY] = PART_OF_A_WHOLE_UNIT
return res
[docs]
def maximum(
climate_vars: list[ClimateVariable],
resample_freq: Frequency,
date_event: bool,
**kwargs, # noqa: ARG001
) -> DataArray:
"""
Calculate the maximum value of the given climate variables.
Parameters
----------
climate_vars : list[ClimateVariable]
List of climate variables to calculate the maximum value for.
resample_freq : Frequency
The frequency at which to resample the data.
date_event : bool
Flag indicating whether the output should include the date of the events.
**kwargs : dict
Ignored keyword arguments (for compatibility).
Returns
-------
DataArray
The maximum value of the climate variables.
"""
return _run_simple_reducer(
climate_vars=climate_vars,
resample_freq=resample_freq,
reducer_op=DataArrayResample.max,
date_event=date_event,
)
[docs]
def minimum(
climate_vars: list[ClimateVariable],
resample_freq: Frequency,
date_event: bool,
**kwargs, # noqa: ARG001
) -> DataArray:
"""
Calculate the minimum value of the given climate variables.
Parameters
----------
climate_vars : list[ClimateVariable]
List of climate variables to calculate the minimum value for.
resample_freq : Frequency
The frequency at which to resample the data.
date_event : bool
Flag indicating whether the output should include the date of the events.
**kwargs : dict
Ignored keyword arguments (for compatibility).
Returns
-------
DataArray
The minimum value of the climate variables.
"""
return _run_simple_reducer(
climate_vars=climate_vars,
resample_freq=resample_freq,
reducer_op=DataArrayResample.min,
date_event=date_event,
)
[docs]
def average(
climate_vars: list[ClimateVariable],
resample_freq: Frequency,
**kwargs, # noqa: ARG001
) -> DataArray:
"""
Compute the average of the given climate variables.
Parameters
----------
climate_vars : list[ClimateVariable]
List of climate variables to compute the average for.
resample_freq : Frequency
The frequency at which to resample the data.
**kwargs
Ignored keyword arguments (for compatibility).
Returns
-------
DataArray
The computed average as a DataArray.
"""
return _run_simple_reducer(
climate_vars=climate_vars,
resample_freq=resample_freq,
reducer_op=DataArrayResample.mean,
date_event=False,
)
[docs]
def generic_sum(
climate_vars: list[ClimateVariable],
resample_freq: Frequency,
**kwargs, # noqa: ARG001
) -> DataArray:
"""
Compute the sum of the given climate variables.
Parameters
----------
climate_vars : list[ClimateVariable]
List of climate variables to compute the sum for.
resample_freq : Frequency
The frequency at which to resample the data.
**kwargs
Ignored keyword arguments (for compatibility).
Returns
-------
DataArray
The computed sum as a DataArray.
"""
return _run_simple_reducer(
climate_vars=climate_vars,
resample_freq=resample_freq,
reducer_op=DataArrayResample.sum,
date_event=False,
must_convert_rate=True,
)
[docs]
def standard_deviation(
climate_vars: list[ClimateVariable],
resample_freq: Frequency,
**kwargs, # noqa: ARG001
) -> DataArray:
"""
Compute the standard deviation of the given climate variables.
This function calculates the standard deviation of the provided climate variables.
The standard deviation is a measure of the amount of variation or dispersion in the
data.
It quantifies the amount of variation or spread in the values of the climate
variables.
Parameters
----------
climate_vars : list[ClimateVariable]
List of climate variables to compute the standard deviation for.
resample_freq : Frequency
The frequency at which to resample the data.
**kwargs
Ignored keyword arguments (for compatibility).
Returns
-------
DataArray
The computed standard deviation as a DataArray.
"""
return _run_simple_reducer(
climate_vars,
resample_freq,
DataArrayResample.std,
date_event=False,
)
[docs]
def max_of_rolling_sum(
climate_vars: list[ClimateVariable],
resample_freq: Frequency,
rolling_window_width: int,
date_event: bool,
source_freq_delta: timedelta,
**kwargs, # noqa: ARG001
) -> DataArray:
"""
Compute the maximum value of the rolling sum of the given climate variables.
The rolling sum is the sum of values over a specified rolling window width.
The maximum value is the highest value obtained from the rolling sum.
Parameters
----------
climate_vars : list[ClimateVariable]
List of climate variables to compute the maximum value of the rolling sum for.
resample_freq : Frequency
The frequency at which to resample the data.
rolling_window_width : int
The width of the rolling window, i.e., the number of values to include in each
rolling sum.
date_event : bool
A flag indicating whether the date of the events should be included in the
output.
source_freq_delta : timedelta
The time difference between consecutive data points in the source data.
For daily data this is 1 day, for monthly data this is 1 month, etc.
**kwargs
Ignored keyword arguments (for compatibility).
Returns
-------
DataArray
The computed maximum value of the rolling sum as a DataArray.
"""
return _run_rolling_reducer(
climate_vars=climate_vars,
resample_freq=resample_freq,
rolling_window_width=rolling_window_width,
rolling_op=DataArrayRolling.sum,
resampled_op=DataArrayResample.max,
date_event=date_event,
source_freq_delta=source_freq_delta,
)
[docs]
def min_of_rolling_sum(
climate_vars: list[ClimateVariable],
resample_freq: Frequency,
rolling_window_width: int,
date_event: bool,
source_freq_delta: timedelta,
**kwargs, # noqa: ARG001
) -> DataArray:
"""
Compute the minimum value of the rolling sum of the given climate variables.
The rolling sum is the sum of values over a specified rolling window width.
The minimum value is the lowest value obtained from the rolling sum.
Parameters
----------
climate_vars : list[ClimateVariable]
List of climate variables to compute the minimum value of the rolling sum for.
resample_freq : Frequency
The frequency at which to resample the data.
rolling_window_width : int
The width of the rolling window, i.e., the number of values to include in each
rolling sum.
date_event : bool
A flag indicating whether the date of the events should be included in the
output.
source_freq_delta : timedelta
The time difference between consecutive data points in the source data.
For daily data this is 1 day, for monthly data this is 1 month, etc.
**kwargs
Ignored keyword arguments (for compatibility).
Returns
-------
DataArray
The computed minimum value of the rolling sum as a DataArray.
"""
return _run_rolling_reducer(
climate_vars=climate_vars,
resample_freq=resample_freq,
rolling_window_width=rolling_window_width,
rolling_op=DataArrayRolling.sum,
resampled_op=DataArrayResample.min,
date_event=date_event,
source_freq_delta=source_freq_delta,
)
[docs]
def min_of_rolling_average(
climate_vars: list[ClimateVariable],
resample_freq: Frequency,
rolling_window_width: int,
date_event: bool,
source_freq_delta: timedelta,
**kwargs, # noqa: ARG001
) -> DataArray:
"""
Compute the minimum value of the rolling average of the given climate variables.
The rolling average is the average of values over a specified rolling window width.
The minimum value is the lowest value obtained from the rolling average.
Parameters
----------
climate_vars : list[ClimateVariable]
List of climate variables to compute the minimum value of the rolling average
for.
resample_freq : Frequency
The frequency at which to resample the data.
rolling_window_width : int
The width of the rolling window, i.e., the number of values to include in each
rolling average.
date_event : bool
A flag indicating whether the date of the events should be included in the
output.
source_freq_delta : timedelta
The time difference between consecutive data points in the source data.
For daily data this is 1 day, for monthly data this is 1 month, etc.
**kwargs
Ignored keyword arguments (for compatibility).
Returns
-------
DataArray
The computed minimum value of the rolling average as a DataArray.
"""
return _run_rolling_reducer(
climate_vars=climate_vars,
resample_freq=resample_freq,
rolling_window_width=rolling_window_width,
rolling_op=DataArrayRolling.mean,
resampled_op=DataArrayResample.min,
date_event=date_event,
source_freq_delta=source_freq_delta,
)
[docs]
def max_of_rolling_average(
climate_vars: list[ClimateVariable],
resample_freq: Frequency,
rolling_window_width: int,
date_event: bool,
source_freq_delta: timedelta,
**kwargs, # noqa: ARG001
) -> DataArray:
"""
Compute the minimum value of the rolling average of the given climate variables.
The rolling average is the average of values over a specified rolling window width.
The minimum value is the lowest value obtained from the rolling average.
Parameters
----------
climate_vars : list[ClimateVariable]
List of climate variables to compute the minimum value of the rolling average
for.
resample_freq : Frequency
The frequency at which to resample the data.
rolling_window_width : int
The width of the rolling window, i.e., the number of values to include in each
rolling average.
date_event : bool
A flag indicating whether the date of the events should be included in the
output.
source_freq_delta : timedelta
The time difference between consecutive data points in the source data.
For daily data this is 1 day, for monthly data this is 1 month, etc.
**kwargs
Ignored keyword arguments (for compatibility).
Returns
-------
DataArray
The computed minimum value of the rolling average as a DataArray.
"""
return _run_rolling_reducer(
climate_vars=climate_vars,
resample_freq=resample_freq,
rolling_window_width=rolling_window_width,
rolling_op=DataArrayRolling.mean,
resampled_op=DataArrayResample.max,
date_event=date_event,
source_freq_delta=source_freq_delta,
)
[docs]
def mean_of_difference(
climate_vars: list[ClimateVariable],
resample_freq: Frequency,
**kwargs, # noqa: ARG001
) -> DataArray:
"""
Calculate the mean of the difference between two climate variables.
This function calculates the mean of the difference between two climate variables
for each time step, and then resamples the resulting data based on the specified
frequency.
The resulting data array will have the same units as the study variable.
Parameters
----------
climate_vars : list[ClimateVariable]
The two climate variables necessary to compute the indicator.
resample_freq : Frequency
Resampling frequency of the output.
**kwargs : dict
Ignored keyword arguments (for compatibility).
Returns
-------
DataArray
The mean of the difference as a xarray.DataArray.
Notes
-----
This is a generification of ECAD's DTR climate index.
"""
study, ref = get_couple_of_var(climate_vars, "mean_of_difference")
mean_of_diff = (study - ref).resample(time=resample_freq.pandas_freq).mean()
mean_of_diff.attrs["units"] = study.attrs["units"]
return mean_of_diff
[docs]
def difference_of_extremes(
climate_vars: list[ClimateVariable],
resample_freq: Frequency,
**kwargs, # noqa: ARG001
) -> DataArray:
"""
Calculate the difference of extremes between two climate variables.
Parameters
----------
climate_vars : list[ClimateVariable]
A list of climate variables.
resample_freq : Frequency
The frequency at which to resample the data.
**kwargs
Ignored keyword arguments (for compatibility).
Returns
-------
DataArray
The difference of extremes between the two climate variables.
Notes
-----
This function calculates the difference of extremes between two climate variables.
It first resamples the study variable to the specified frequency and take the
maximum per resampled chunk.
Then it resamples the reference variable to the same frequency and take the minimum
per resampled chunk.
Finally, for each chunk, it calculates the differences of theses maximum and
minimum values.
This is a generification of ECAD's ETR climate index.
"""
study, ref = get_couple_of_var(climate_vars, "difference_of_extremes")
max_study = study.resample(time=resample_freq.pandas_freq).max()
min_ref = ref.resample(time=resample_freq.pandas_freq).min()
diff_of_extremes = max_study - min_ref
diff_of_extremes.attrs["units"] = study.attrs["units"]
return diff_of_extremes
[docs]
def mean_of_absolute_one_time_step_difference(
climate_vars: list[ClimateVariable],
resample_freq: Frequency,
**kwargs, # noqa: ARG001
) -> DataArray:
"""
Mean of the absolute one-time-step difference between two climate variables.
This function calculates the mean of the absolute difference between two climate
variables
for each time step, and then resamples the resulting data based on the specified
frequency.
The resulting data array will have the same units as the study variable.
Parameters
----------
climate_vars : list[ClimateVariable]
The two climate variables necessary to compute the indicator.
resample_freq : Frequency
Resampling frequency of the output.
**kwargs : dict
Ignored keyword arguments (for compatibility).
Returns
-------
DataArray
The mean of the absolute one-time-step difference as a xarray.DataArray.
Notes
-----
This is a generification of ECAD's vDTR climate index.
"""
study, ref = get_couple_of_var(
climate_vars,
"mean_of_absolute_one_time_step_difference",
)
one_time_step_diff = (study - ref).diff(dim="time")
res = abs(one_time_step_diff).resample(time=resample_freq.pandas_freq).mean()
res.attrs["units"] = study.attrs["units"]
return res
[docs]
def difference_of_means(
climate_vars: list[ClimateVariable],
to_percent: bool,
resample_freq: Frequency,
sampling_method: str,
is_compared_to_reference: bool,
**kwargs, # noqa: ARG001
) -> DataArray:
"""
Calculate the difference of means between two climate variables.
Parameters
----------
climate_vars : list[ClimateVariable]
A studied climate variable and a reference climate variable.
to_percent : bool
If True, the result will be converted to percentage.
resample_freq : Frequency
Resampling frequency of the output.
sampling_method : str
The method used for resampling. It can be either 'group_by', 'resample', or
'group_by_ref_and_resample_study'.
'group_by' will group the data by the specified frequency, for example every
data of every January together.
'resample' will resample the data to the specified frequency, for example every
days of each month independently together.
'group_by_ref_and_resample_study' will group the reference data by the specified
frequency and resample the study data to the same frequency.
This last method allows for example to compare each January, independently, of
the study period to every January of the reference period.
This is typically used to compare the each month of the studied period
to a normal (the reference) of many aggregated years.
is_compared_to_reference : bool
If True, check if the sampling method is 'resample' and raise an error if it is.
It does not make sense to resample the reference variable if it is already a
subsample of the studied variable.
**kwargs
Ignored keyword arguments (for compatibility).
Returns
-------
DataArray
The difference of means between the two climate variables.
Notes
-----
This is a generification of the anomaly climate index.
"""
study = climate_vars[0].studied_data
ref = climate_vars[1].studied_data
study = convert_units_to(study, ref, context="hydro")
return _reduce_and_diff(
study,
ref,
to_percent,
resample_freq,
sampling_method,
is_compared_to_reference,
reducer=lambda x: x.mean(dim="time"),
)
[docs]
def percentile(
climate_vars: list[ClimateVariable],
resample_freq: Frequency,
**kwargs, # noqa: ARG001
) -> DataArray:
"""
Calculate the percentile of the given climate variable.
Parameters
----------
climate_vars : list[ClimateVariable]
A single climate variable within a list.
resample_freq : Frequency
Resampling frequency of the output.
**kwargs
Ignored keyword arguments (for compatibility).
Returns
-------
DataArray
The calculated percentile as a DataArray.
Notes
-----
This function calculates the percentile of the given climate variables
by resampling the data based on the provided frequency and then
calculating the corresponding quantile using the specified interpolation method.
The resulting DataArray contains the percentiles as the 'percentiles'
coordinate variable.
"""
study, threshold = get_single_var(climate_vars)
threshold: PercentileThreshold
quantile = threshold.initial_value[0] * 0.01
method = threshold.interpolation.name
result = study.resample(time=resample_freq.pandas_freq).quantile(
quantile, method=method
)
result.coords["quantile"] = result.coords["quantile"] * 100
result = result.rename(quantile="percentiles")
return PercentileDataArray.from_da(
source=result,
climatology_bounds=build_climatology_bounds(study),
)
def _reduce_and_diff(
study: DataArray,
ref: DataArray,
to_percent: bool,
resample_freq: Frequency,
sampling_method: str,
is_compared_to_reference: bool,
reducer: Callable[[DataArrayResample | DataArray | DataArrayGroupBy], DataArray],
use_reduce_unit: bool = False,
**kwargs, # noqa: ARG001
) -> DataArray:
if is_compared_to_reference and sampling_method == RESAMPLE_METHOD:
msg = (
"It does not make sense to resample the reference variable if it is"
" already a subsample of the studied variable. Try setting"
f" `sampling_method='{GROUP_BY_REF_AND_RESAMPLE_STUDY_METHOD}'`"
f" instead."
)
raise InvalidIcclimArgumentError(msg)
if sampling_method == GROUP_BY_METHOD:
if resample_freq.group_by_key == RUN_INDEXER:
reduced_study = reducer(study)
reduced_ref = reducer(ref)
else:
reduced_study = reducer(study.groupby(resample_freq.group_by_key))
reduced_ref = reducer(ref.groupby(resample_freq.group_by_key))
elif sampling_method == RESAMPLE_METHOD:
reduced_study = reducer(study.resample(time=resample_freq.pandas_freq))
reduced_ref = reducer(ref.resample(time=resample_freq.pandas_freq))
elif sampling_method == GROUP_BY_REF_AND_RESAMPLE_STUDY_METHOD:
if (
resample_freq.group_by_key == RUN_INDEXER
or resample_freq == FrequencyRegistry.YEAR
):
reduced_study = reducer(study.resample(time=resample_freq.pandas_freq))
# data is already filtered with only the indexed values.
# Thus there is only one "group".
reduced_ref = reducer(ref)
else:
return _reduce_and_diff_of_resampled_x_by_groupedby_y(
resample_freq, to_percent, study, ref, reducer=reducer
)
else:
msg = f"Unknown sampling_method: '{sampling_method}'."
raise NotImplementedError(msg)
diff = reduced_study - reduced_ref
if to_percent:
diff = diff / reduced_ref * 100
diff.attrs[UNITS_KEY] = "%"
elif use_reduce_unit:
diff.attrs[UNITS_KEY] = reduced_study.attrs[UNITS_KEY]
else:
diff.attrs[UNITS_KEY] = study.attrs[UNITS_KEY]
return diff
def _reduce_and_diff_of_resampled_x_by_groupedby_y(
resample_freq: Frequency,
to_percent: bool,
study: DataArray,
ref: DataArray,
reducer: Callable[
[DataArrayResample | DataArrayGroupBy | DataArray | ClimateVariable], DataArray
],
) -> DataArray:
mean_ref = reducer(ref.groupby(resample_freq.group_by_key))
acc = []
if resample_freq == FrequencyRegistry.MONTH:
key = "month"
dt_selector = lambda x: x.time.dt.month # noqa: E731
elif resample_freq == FrequencyRegistry.DAY:
key = "dayofyear"
dt_selector = lambda x: x.time.dt.dayofyear # noqa: E731
else:
msg = (
f"Can't use {GROUP_BY_REF_AND_RESAMPLE_STUDY_METHOD}"
f" with the frequency {resample_freq.long_name}."
)
raise NotImplementedError(msg)
for label, sample in study.resample(time=resample_freq.pandas_freq):
sample_mean = reducer(sample)
ref_group_mean = mean_ref.sel({key: dt_selector(sample).to_numpy()[0]})
sample_diff_of_means = sample_mean - ref_group_mean
if to_percent:
sample_diff_of_means = sample_diff_of_means / ref_group_mean * 100
del sample_diff_of_means[key]
sample_diff_of_means = sample_diff_of_means.expand_dims(time=[label])
acc.append(sample_diff_of_means)
diff_of_means = xr.concat(acc, dim="time")
if to_percent:
diff_of_means.attrs["units"] = "%"
else:
diff_of_means.attrs["units"] = study.attrs["units"]
return diff_of_means
def _compute_exceedance(
study: DataArray,
threshold: Threshold,
freq: str, # used by @percentile_bootstrap (don't rename, it breaks bootstrap)
bootstrap: bool, # used by @percentile_bootstrap
) -> DataArray:
exceedances = threshold.compute(study, freq=freq, bootstrap=bootstrap)
if bootstrap:
exceedances.attrs[REFERENCE_PERIOD_ID] = threshold.value.attrs[
"climatology_bounds"
]
return exceedances
[docs]
def get_couple_of_var(
climate_vars: list[ClimateVariable],
indicator: str,
) -> tuple[DataArray, DataArray]:
"""
Get exactly two climate variables to compute a climate indicator.
Parameters
----------
climate_vars : list[ClimateVariable]
A list of climate variables.
indicator : str
The name of the indicator to be computed.
Returns
-------
tuple[DataArray, DataArray]
A tuple containing two DataArray objects representing the study variable and
the reference variable.
Raises
------
InvalidIcclimArgumentError
If the number of climate variables is not equal to 2.
If any of the two variable has a threshold.
Notes
-----
This function is used to extract a couple of climate variables needed for computing
an indicator.
The function checks the number of climate variables and raises an error
if it is not equal to 2 or if thresholds are present.
"""
if len(climate_vars) != 2:
msg = (
f"{indicator} needs two variables **or** one variable and a "
f"`base_period_time_range` period to extract a reference variable."
)
raise InvalidIcclimArgumentError(msg)
if climate_vars[0].threshold or climate_vars[1].threshold:
msg = f"{indicator} cannot be computed with thresholds."
raise InvalidIcclimArgumentError(msg)
study = climate_vars[0].studied_data
ref = climate_vars[1].studied_data
study = convert_units_to(study, ref, context="hydro")
return study, ref
def _run_rolling_reducer(
climate_vars: list[ClimateVariable],
resample_freq: Frequency,
rolling_window_width: int,
rolling_op: Callable[[DataArrayRolling], DataArray], # sum | mean
resampled_op: Callable[[DataArrayResample], DataArray], # max | min
date_event: bool,
source_freq_delta: timedelta,
) -> DataArray:
study, threshold = get_single_var(climate_vars)
if threshold:
exceedance = _compute_exceedance(
study=study,
freq=resample_freq.pandas_freq,
threshold=threshold,
bootstrap=must_run_bootstrap(study, threshold),
).squeeze()
study = study.where(exceedance)
study = rolling_op(study.rolling(time=rolling_window_width))
study = study.resample(time=resample_freq.pandas_freq)
if date_event:
return _reduce_with_date_event(
resampled=study,
reducer=resampled_op,
window=rolling_window_width,
source_delta=source_freq_delta,
)
return resampled_op(study, dim="time")
[docs]
def _run_simple_reducer(
climate_vars: list[ClimateVariable],
resample_freq: Frequency,
reducer_op: Callable[..., DataArray],
date_event: bool,
must_convert_rate: bool = False,
) -> DataArray:
"""
Apply a simple reducer operation on climate variables.
Parameters
----------
climate_vars : list[ClimateVariable]
List of climate variables to be processed.
resample_freq : Frequency
Frequency at which the data should be resampled.
reducer_op : Callable[..., DataArray]
Reducer operation to be applied on the data.
date_event : bool
Flag indicating whether the date when the event occurred should be added
as a coordinate variable.
Only works for `max` and `min` reducers.
Defaults to False.
must_convert_rate : bool, optional
Flag indicating whether the data should be converted from rate to amount.
Defaults to False.
Returns
-------
DataArray
Result of the reducer operation applied on the climate variables.
"""
study, threshold = get_single_var(climate_vars)
if threshold is not None:
exceedance = _compute_exceedance(
study=study,
freq=resample_freq.pandas_freq,
threshold=threshold,
bootstrap=must_run_bootstrap(study, threshold),
).squeeze()
filtered_study = study.where(exceedance)
else:
filtered_study = study
if must_convert_rate and _is_rate(filtered_study):
filtered_study = rate2amount(filtered_study)
if date_event:
return _reduce_with_date_event(
resampled=filtered_study.resample(time=resample_freq.pandas_freq),
reducer=reducer_op,
)
return reducer_op(
filtered_study.resample(time=resample_freq.pandas_freq),
dim="time",
)
[docs]
def get_single_var(
climate_vars: list[ClimateVariable],
) -> tuple[DataArray, Threshold | None]:
"""
Get the single variable and its threshold (if available).
Parameters
----------
climate_vars : list[ClimateVariable]
A list of ClimateVariable objects.
Returns
-------
tuple[DataArray, Threshold | None]
A tuple containing the single variable's data array and its threshold
(if available).
"""
if climate_vars[0].threshold:
return (
climate_vars[0].studied_data,
climate_vars[0].threshold,
)
return climate_vars[0].studied_data, None
def _compute_exceedances(
climate_vars: list[ClimateVariable],
resample_freq: str,
logical_link: LogicalLink,
) -> DataArray:
exceedances = [
_compute_exceedance(
study=climate_var.studied_data,
threshold=climate_var.threshold,
freq=resample_freq,
bootstrap=must_run_bootstrap(
climate_var.studied_data,
climate_var.threshold,
),
).squeeze()
for climate_var in climate_vars
]
return logical_link(exceedances)
def _get_thresholded_var(
climate_vars: list[ClimateVariable],
) -> tuple[DataArray, Threshold]:
if climate_vars[0].threshold:
return (
climate_vars[0].studied_data,
climate_vars[0].threshold,
)
msg = "No threshold found"
raise InvalidIcclimArgumentError(msg)
def _reduce_with_date_event(
resampled: DataArrayResample,
reducer: Callable[[DataArrayResample], DataArray],
source_delta: timedelta | None = None,
window: int | None = None,
) -> DataArray:
acc: list[DataArray] = []
if reducer == DataArrayResample.max:
group_reducer = DataArray.argmax
elif reducer == DataArrayResample.min:
group_reducer = DataArray.argmin
else:
msg = f"Can't compute `date_event` due to unknown reducer: '{reducer}'"
raise NotImplementedError(msg)
for label, sample in resampled:
reduced_result = sample.isel(time=group_reducer(sample, dim="time"))
if window is not None:
result = _add_date_coords(
original_sample=sample,
result=sample.sum(dim="time"),
start_time=reduced_result.time,
end_time=reduced_result.time + window * source_delta,
label=label,
)
else:
result = _add_date_coords(
original_sample=sample,
result=sample.sum(dim="time"),
event_date=reduced_result.time,
label=label,
)
acc.append(result)
return xr.concat(acc, "time")
def _count_occurrences_with_date(resampled: DataArrayResample) -> DataArray:
acc: list[DataArray] = []
for label, _sample in resampled:
# TODO @bzah: probably not safe to compute on huge dataset,
# it should be fixed with
# https://github.com/pydata/xarray/issues/2511
sample = _sample.compute()
first = sample.isel(time=sample.argmax("time")).time
reversed_time = sample.reindex(time=list(reversed(sample.time.to_numpy())))
last = reversed_time.isel(time=reversed_time.argmax("time")).time
dated_occurrences = _add_date_coords(
original_sample=sample,
result=sample.sum(dim="time"),
start_time=first,
end_time=last,
label=label,
)
acc.append(dated_occurrences)
return xr.concat(acc, "time")
def _consecutive_occurrences_with_dates(
resampled: DataArrayResample,
source_freq_delta: timedelta,
) -> DataArray:
acc = []
for label, _sample in resampled:
sample = _sample.where(~_sample.isnull(), 0)
time_index_of_max_rle = sample.argmax(dim="time")
# TODO @bzah: `.compute` is needed until xarray merges this pr:
# https://github.com/pydata/xarray/pull/5873
time_index_of_max_rle = time_index_of_max_rle.compute()
dated_longest_run = sample[{"time": time_index_of_max_rle}]
start_time = sample.isel(
time=time_index_of_max_rle.where(time_index_of_max_rle > 0, 0),
).time
end_time = start_time + (dated_longest_run * source_freq_delta)
dated_longest_run = _add_date_coords(
original_sample=sample,
result=dated_longest_run,
start_time=start_time,
end_time=end_time,
label=label,
)
acc.append(dated_longest_run)
return xr.concat(acc, "time")
def _add_date_coords(
original_sample: DataArray,
result: DataArray,
label: str | np.datetime64,
start_time: DataArray = None,
end_time: DataArray = None,
event_date: DataArray = None,
) -> DataArray:
new_coords = {c: original_sample[c] for c in original_sample.coords if c != "time"}
if event_date is None:
new_coords["event_date_start"] = start_time
new_coords["event_date_end"] = end_time
else:
new_coords["event_date"] = event_date
new_coords["time"] = label
return DataArray(data=result, coords=new_coords)
def _to_percent(da: DataArray, sampling_freq: Frequency) -> DataArray:
if sampling_freq == FrequencyRegistry.MONTH:
da = da / da.time.dt.daysinmonth * 100
elif sampling_freq == FrequencyRegistry.YEAR:
coef = xr.full_like(da, 1)
leap_years = _is_leap_year(da)
coef[{"time": leap_years}] = 366
coef[{"time": ~leap_years}] = 365
da = da / coef
elif sampling_freq == FrequencyRegistry.AMJJAS:
da = da / 183
elif sampling_freq == FrequencyRegistry.ONDJFM:
coef = xr.full_like(da, 1)
leap_years = _is_leap_year(da)
coef[{"time": leap_years}] = 183
coef[{"time": ~leap_years}] = 182
da = da / coef
elif sampling_freq == FrequencyRegistry.DJF:
coef = xr.full_like(da, 1)
leap_years = _is_leap_year(da)
coef[{"time": leap_years}] = 91
coef[{"time": ~leap_years}] = 90
da = da / coef
elif sampling_freq in [FrequencyRegistry.MAM, FrequencyRegistry.JJA]:
da = da / 92
elif sampling_freq == FrequencyRegistry.SON:
da = da / 91
else:
# TODO @bzah: improve this for custom resampling
# https://github.com/cerfacs-globc/icclim/issues/289
warn(
"For now, '%' unit can only be used when `slice_mode` is one of: "
"{MONTH, YEAR, AMJJAS, ONDJFM, DJF, MAM, JJA, SON}.",
stacklevel=2,
)
return da
da.attrs[UNITS_KEY] = PART_OF_A_WHOLE_UNIT
return da
def _is_leap_year(da: DataArray) -> np.ndarray:
time_index = da.indexes.get("time")
if isinstance(time_index, xr.CFTimeIndex):
return CfCalendarRegistry.lookup(time_index.calendar).is_leap(da.time.dt.year)
return da.time.dt.is_leap_year
def _is_rate(query: Quantity | DataArray) -> bool:
if isinstance(query, DataArray):
query = str2pint(query.attrs[UNITS_KEY])
return query.dimensionality.get("[time]", None) == -1