Skip to content

Commit 4c850ca

Browse files
Add reduce transform (#252)
## Description A reduction transform that performs operations over a dimension into a new variable back into the collection. Current operations include: "mean", "sum", "min", "max", "std", and "identity". Optionally, can mask nan values and squeeze Z dimensions. ## Dependencies N/A ## Impact N/A
1 parent f9f85cb commit 4c850ca

1 file changed

Lines changed: 147 additions & 0 deletions

File tree

src/eva/transforms/reduce.py

Lines changed: 147 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,147 @@
1+
"""
2+
EVA Transform: reduce
3+
=====================
4+
5+
A small, generic reduction transform that operates on a single DataArray in the
6+
DataCollections and writes the reduced result back into the collection.
7+
8+
1) Fetch a variable from DataCollections (using "Collection::Group::Variable")
9+
2) Optionally mask fill values to NaN so reductions can use `skipna=True`
10+
3) Optionally squeeze `binsZDim` by selecting index 0 (common when binsZDim == 1)
11+
4) Apply a reduction over one or more named dimensions
12+
5) Save the result back to DataCollections under "new name"
13+
14+
Supported operations: "mean", "sum", "min", "max", "std", and "identity"
15+
- "identity" does no reduction, but still applies mask/squeeze if requested.
16+
17+
---------------------------------------------------------------------------
18+
Example YAML
19+
---------------------------------------------------------------------------
20+
transforms:
21+
- transform: reduce
22+
source: "ObsMonitor::griddedBins_ombg_stationPressure::mean"
23+
new name: "ObsMonitor::griddedBins_ombg_stationPressure::mean_cycleAvg"
24+
op: "mean"
25+
dims: ["analysisCycle"]
26+
skipna: true
27+
squeeze_binsZ: false
28+
mask_fill: true
29+
30+
# 2-D output directly (no slicing later):
31+
- transform: reduce
32+
source: "ObsMonitor::griddedBins_ombg_stationPressure::mean"
33+
new name: "ObsMonitor::griddedBins_ombg_stationPressure::mean_cycleAvg_2d"
34+
op: "mean"
35+
dims: ["analysisCycle"]
36+
squeeze_binsZ: true
37+
mask_fill: true
38+
"""
39+
40+
from typing import List, Optional
41+
42+
import xarray as xr
43+
44+
from eva.utilities.logger import Logger
45+
from eva.utilities.config import get
46+
from eva.transforms.transform_utils import split_collectiongroupvariable
47+
48+
49+
def _mask_fill(da: xr.DataArray) -> xr.DataArray:
50+
"""Mask the DataArray's _FillValue (if present) to NaN."""
51+
if da is None:
52+
return da
53+
fv = da.attrs.get("_FillValue", None)
54+
if fv is None and hasattr(da, "encoding"):
55+
fv = da.encoding.get("_FillValue", None)
56+
return da.where(da != fv) if fv is not None else da
57+
58+
59+
def reduce(config: dict, data_collections) -> None:
60+
"""
61+
Reduce a DataArray over named dimensions and store the result back into EVA.
62+
63+
Parameters
64+
----------
65+
config : dict
66+
Transform configuration with the following keys:
67+
- source (str): "Collection::Group::Variable" to read
68+
- new name (str): "Collection::Group::NewVariable" to write
69+
- op (str): one of {"mean","sum","min","max","std","identity"}
70+
- dims (list[str], optional): dims to reduce over (e.g., ["analysisCycle"])
71+
- skipna (bool, optional, default=True): ignore NaNs in reductions
72+
- squeeze_binsZ (bool, optional, default=False):
73+
if True and "binsZDim" present, select index 0
74+
- mask_fill (bool, optional, default=True):
75+
mask `_FillValue` to NaN before reduction
76+
data_collections : eva.data.data_collections.DataCollections
77+
EVA DataCollections instance.
78+
79+
Returns
80+
-------
81+
None
82+
The reduced DataArray is added to DataCollections under "new name".
83+
84+
Notes
85+
-----
86+
- "identity" op is useful to apply mask/squeeze only (no reduction).
87+
- If `dims` is omitted or empty for reduction ops, the transform reduces over *all* dims.
88+
- Attributes/coords are preserved by xarray.
89+
"""
90+
logger = Logger("ReduceTransform")
91+
92+
# Required fields
93+
source = get(config, logger, "source")
94+
new_name = get(config, logger, "new name")
95+
op = get(config, logger, "op").lower()
96+
97+
# Optional fields
98+
dims: Optional[List[str]] = get(config, logger, "dims", default=None)
99+
skipna: bool = get(config, logger, "skipna", default=True)
100+
squeeze_bins: bool = get(config, logger, "squeeze_binsZ", default=False)
101+
mask_fill: bool = get(config, logger, "mask_fill", default=True)
102+
103+
# Parse source and fetch
104+
coll, group, var = split_collectiongroupvariable(logger, source)
105+
da: xr.DataArray = data_collections.get_variable_data_array(coll, group, var)
106+
107+
if da is None:
108+
raise ValueError(f"ReduceTransform: source '{source}' not found in DataCollections.")
109+
110+
# Mask fill value if requested
111+
if mask_fill:
112+
da = _mask_fill(da)
113+
114+
# Optional vertical squeeze (select first vertical bin)
115+
if squeeze_bins and "binsZDim" in da.dims:
116+
da = da.isel(binsZDim=0)
117+
118+
# Normalize dims: None -> all dims for reduction ops
119+
if op != "identity":
120+
if dims is None or len(dims) == 0:
121+
dims = list(da.dims)
122+
123+
# Dispatch operation
124+
if op == "mean":
125+
out = da.mean(dim=dims, skipna=skipna)
126+
elif op == "sum":
127+
out = da.sum(dim=dims, skipna=skipna)
128+
elif op == "min":
129+
out = da.min(dim=dims, skipna=skipna)
130+
elif op == "max":
131+
out = da.max(dim=dims, skipna=skipna)
132+
elif op == "std":
133+
out = da.std(dim=dims, skipna=skipna)
134+
elif op == "identity":
135+
out = da
136+
else:
137+
raise ValueError(f"ReduceTransform: unsupported op '{op}'.")
138+
139+
# Write back
140+
out_coll, out_group, out_var = split_collectiongroupvariable(logger, new_name)
141+
data_collections.add_variable_to_collection(out_coll, out_group, out_var, out)
142+
143+
logger.info(
144+
f"ReduceTransform: wrote '{new_name}' "
145+
f"(op={op}, dims={dims if op != 'identity' else 'n/a'}, skipna={skipna}, "
146+
f"squeeze_binsZ={squeeze_bins}, mask_fill={mask_fill})"
147+
)

0 commit comments

Comments
 (0)