"""Cmorizer
ERA5 cmorizer is mostly based on the official ECMWF documentation of converting GRIB to NetCDF:
https://confluence.ecmwf.int/display/OIFS/How+to+convert+GRIB+to+netCDF
"""
import os
import subprocess
from os import path as op
from pprint import pprint
from warnings import warn
from importlib.resources import files
import copy
import pandas as pd
import xarray as xr
from cdo import Cdo
from ..utils import read_yaml
# path and file templates at DKRZ
# see https://docs.dkrz.de/doc/dataservices/finding_and_accessing_data/era_data/#file-and-directory-names
dkrz_template = {
"path_template": "/pool/data/ERA5/{era_id}/{level_type}/{dataType}/{frequency}/{code:03d}",
"file_template": "{era_id}{level_type}{typeid}_{frequency}_{date}_{code:03d}.grb",
}
era5_params_file = files("pyremo.preproc").joinpath("era5-params.yaml")
era5_grid_file = files("pyremo.preproc").joinpath("grid.txt")
ecmwf_params_file = files("pyremo.preproc").joinpath("ecmwf_128.csv")
era5_params_config = read_yaml(era5_params_file)
era5_params = {
k: era5_params_config.get("defaults", {}) | v
for k, v in era5_params_config["parameters"].items()
}
ecmwf_params = pd.read_csv(ecmwf_params_file)
def get_output_filename(date, expid, path=None):
"""Build standard REMO gfile output filename.
Parameters
----------
date : str or datetime-like
Datetime used in the filename; formatted as ``YYYYMMDDHH``.
expid : str
Experiment identifier to include in the filename (e.g., ``"000000"``).
path : str, optional
Directory where the file will be written. Defaults to current directory.
Returns
-------
str
Full path to the output file in the form ``g{expid}a{YYYYMMDDHH}.nc``.
"""
if path is None:
path = "./"
date = pd.to_datetime(date).strftime("%Y%m%d%H")
return op.join(path, f"g{expid}a{date}.nc")
def params_by_code(params, code):
"""Return parameter configuration matching a GRIB code.
Parameters
----------
params : dict
Mapping of variable name to parameter configuration dictionaries.
code : int
GRIB parameter code to search for.
Returns
-------
dict or None
The matching parameter configuration merged with ``{"rename": varname}``,
or ``None`` if no match is found.
"""
for k, v in params.items():
c = v.get("code", -999)
if code == c:
return v | {"rename": k}
return None
def renames_by_code(ds, params):
"""Build a rename mapping based on GRIB codes in a dataset.
Parameters
----------
ds : xarray.Dataset
Dataset whose variables include a ``code`` attribute.
params : dict
Parameter configuration used to map GRIB codes to target names.
Returns
-------
dict
Mapping from original variable names to target names.
"""
renames = {}
for var in ds.data_vars:
code = ds[var].attrs.get("code", -999)
param = params_by_code(params, code)
if param:
renames[var] = param["rename"]
return renames
def get_params(config):
"""Expand parameter configuration with defaults.
Parameters
----------
config : dict
Configuration containing a ``defaults`` dict and a ``parameters`` mapping.
Returns
-------
dict
Mapping of parameter keys to dictionaries with defaults applied.
"""
defaults = config.get("defaults", {})
return {k: defaults | v for k, v in config["parameters"].items()}
def check_search(result):
"""Validate an intake search result and return a single path.
Parameters
----------
result : pandas.DataFrame
DataFrame returned by ``catalog.search(...).df``.
Returns
-------
str or None
The selected path if a unique result exists, otherwise ``None``.
"""
if len(result) == 0:
warn(f"nothing found: {len(result)}")
return None
if len(result) > 1:
pass
# warn(f"result not unique: {len(result)}, {result}")
return result.iloc[0].path
def get_file_from_intake(cat, params, date=None):
"""Query an intake catalog for a single parameter entry.
Parameters
----------
cat : intake.esm.EsmDatastore
Intake catalog to query.
params : dict
Search parameters describing the ERA5 entry (e.g., code, level_type).
date : str or datetime-like, optional
Date used to filter entries; coerced to ``YYYY-MM-DD``. If the
parameter frequency is invariant, ``INVARIANT`` is used.
Returns
-------
intake.catalog.local.LocalCatalogEntry
The search result object; call ``.df`` to access the underlying table.
"""
if date is not None:
date = pd.to_datetime(date).strftime("%Y-%m-%d")
freq = params.get("frequency", None)
if freq is None or freq == "invariant":
date = "INVARIANT"
return cat.search(**params, validation_date=date)
def get_files_from_intake(cat, params, date=None):
"""Collect file paths from an intake catalog for multiple variables.
Parameters
----------
cat : intake.esm.EsmDatastore
Intake catalog to query.
params : dict
Mapping from variable name to search parameters.
date : str or datetime-like, optional
Date used to filter entries.
Returns
-------
dict
Mapping from variable name to resolved file path (or ``None`` if not found).
"""
files = {}
for k, v in params.items():
f = check_search(get_file_from_intake(cat, params=v, date=date).df)
if not f:
warn(f"no result for {k} --> params: {v}")
files[k] = f
return files
def get_file_from_template(
date,
era_id,
frequency,
dataType,
code,
level_type,
template=None,
**kwargs,
):
"""Derive a filename from the DKRZ template.
Parameters
----------
date : str or datetime-like
Date to include in the path/filename; becomes ``YYYY-MM-DD`` unless invariant.
era_id : str
ERA dataset identifier (e.g., ``"E5"`` or ``"E1"``).
frequency : {"hourly", "daily", "monthly", "invariant"}
Data frequency.
dataType : {"an", "fc"}
Data type used to derive the type id in the filename.
code : int
GRIB parameter code.
level_type : {"model_level", "surface"}
Level type selector.
template : dict, optional
Template dictionary with ``path_template`` and ``file_template``. Defaults to
the built-in DKRZ template.
Returns
-------
str
Absolute template-expanded path to the GRIB file.
"""
if template is None:
template = dkrz_template
path_template = template["path_template"]
file_template = template["file_template"]
lt = {
"model_level": "ml",
"surface": "sf",
}
freqs = {
"hourly": "1H",
"daily": "1D",
"monthly": "1M",
"invariant": "IV",
}
typeids = {
"an": "00",
"fc": "12",
}
level_type = lt.get(level_type, level_type)
frequency = freqs.get(frequency, frequency)
typeid = typeids.get(dataType, dataType)
if frequency == "IV":
date = "INVARIANT"
else:
date = pd.to_datetime(date).strftime("%Y-%m-%d")
filepath_template = op.join(path_template, file_template)
return filepath_template.format(
date=date,
era_id=era_id,
frequency=frequency,
dataType=dataType,
code=code,
level_type=level_type,
typeid=typeid,
)
def get_files_from_template(params, date, template=None):
"""Resolve GRIB file paths for multiple variables from a template.
Parameters
----------
params : dict
Mapping from variable name to template fields (see
``get_file_from_template``).
date : str or datetime-like
Desired date.
template : dict, optional
Template dictionary with ``path_template`` and ``file_template``.
Returns
-------
dict
Mapping from variable name to file path.
"""
files = {}
for k, v in params.items():
f = get_file_from_template(date=date, template=template, **v)
if not op.isfile(f) and v.get("era_id") == "E1":
warn(f"file not found: {f}, will try E5")
v["era_id"] = "E5"
f = get_file_from_template(date=date, template=template, **v)
files[k] = f
return files
[docs]
class ERA5:
"""
Class for cmorizing original ERA5 GRIB data.
Notes
-----
The cmorizer class mostly works with the intake catalog provided by DKRZ.
References
----------
Please refer to the DKRZ data pool documentation: https://docs.dkrz.de/doc/dataservices/finding_and_accessing_data/era_data
"""
dynamic = ["ta", "hus", "ps", "tos", "sic", "clw", "snd"]
wind = ["svo", "sd"]
fx = ["orog", "sftlf"]
soil_vars = [
"tsl1",
"tsl2",
"tsl3",
"tsl4",
"swvl1",
"swvl2",
"swvl3",
"swvl4",
"tsn",
"src",
"skt",
"tos",
"sic",
"skt",
"snd",
]
chunks = {}
options = "-f nc4"
def __init__(
self, params=None, cat=None, gridfile=None, scratch=None, template=None
):
"""Create a new ERA5 cmorizer instance.
Parameters
----------
params : dict, optional
Parameter configuration mapping (defaults to built-in ERA5 parameters).
cat : intake.esm.EsmDatastore or str, optional
Intake catalog or path/URL to an ESM datastore.
gridfile : str, optional
Path to CDO grid description file. Defaults to package resource.
scratch : str, optional
Temporary directory for CDO operations. Defaults to ``$SCRATCH`` or ``."``.
template : dict, optional
File path template mapping. Defaults to the DKRZ template.
"""
if isinstance(cat, str):
import intake
self.cat = intake.open_esm_datastore(cat)
else:
self.cat = cat
if scratch is None:
scratch = os.environ.get("SCRATCH", "./")
self.scratch = scratch
if params is None:
params = era5_params
self.params = params
if gridfile is None:
gridfile = era5_grid_file
self.gridfile = gridfile
if template is None:
template = dkrz_template
self.template = template
self.cdo = Cdo(tempdir=scratch)
def _get_files(self, date, variables=None):
"""Resolve input files for a set of variables and a given date.
Parameters
----------
date : str or datetime-like
Target date for selection.
variables : list of str, optional
Variable names to include. Defaults to dynamic, fx, and wind variables.
Returns
-------
dict
Mapping from variable name to file path.
"""
if variables is None:
variables = self.dynamic + self.fx + self.wind
if self.cat:
return get_files_from_intake(
self.cat,
{k: v for k, v in self.params.items() if k in variables},
date,
)
else:
return get_files_from_template(
date=date,
template=self.template,
params={k: v for k, v in self.params.items() if k in variables},
)
def _seldate(self, filename, date):
"""Build a CDO command to select a single date from a file."""
return f"-seldate,{date} {filename}"
# return self.cdo.seldate(date, input=filename)
def _seldates(self, filenames, date):
"""Apply ``_seldate`` to all non-invariant files in a mapping."""
return {
v: (
self._seldate(f, date)
if self.params[v].get("frequency") != "invariant"
else f
)
for v, f in filenames.items()
}
def _to_regulars(self, filenames, gridtypes):
"""Convert multiple inputs to regular Gaussian grid commands."""
return {
v: self._to_regular(f, gridtype=gridtypes[v], setname=v)
for v, f in filenames.items()
if v in self.dynamic + self.fx + self.soil_vars
}
def _get_gridtypes(self, filenames):
"""Determine grid types for all input files using CDO griddes."""
return {v: self._gridtype(f) for v, f in filenames.items()}
def _open_dsets(self, filenames):
"""Open a mapping of files as chunked ``xarray.Dataset`` objects."""
dsets = {}
for v, f in filenames.items():
ds = xr.open_dataset(f, chunks=self.chunks)
if v in self.fx:
# squeeze out time
ds = ds.squeeze(drop=True)
dsets[v] = ds
return dsets
def _griddes(self, filename):
"""Return parsed CDO griddes information as a dictionary."""
griddes = self.cdo.griddes(input=filename)
return {
entry.split("=")[0].strip(): entry.split("=")[1].strip()
for entry in griddes
if "=" in entry
}
def _gridtype(self, filename):
"""Extract grid type from CDO griddes output."""
return self._griddes(filename)["gridtype"]
def _to_regular(self, filename, gridtype=None, setname="", table="ecmwf"):
"""Convert ECMWF GRIB to regular Gaussian NetCDF (CDO command).
CDO is used depending on the detected ``gridtype``:
- ``gaussian_reduced`` → ``setgridtype,regular``
- ``spectral`` → ``sp2gpl``
- ``gaussian`` → passthrough
This follows the ECMWF ERA5 recommendation.
"""
if table is None:
table = ""
# from cdo import Cdo
# cdo = Cdo(tempdir=scratch)
if gridtype is None:
gridtype = self._gridtype(filename)
# options = f"-f nc4 -t {table}"
if setname:
setname = f"-setname,{setname}" # {filename}"
if gridtype == "gaussian_reduced":
gaussian = "-setgridtype,regular"
elif gridtype == "spectral":
gaussian = "-sp2gpl"
elif gridtype == "gaussian":
gaussian = ""
else:
raise Exception(
"unknown grid type for conversion to regular grid: {}".format(gridtype)
)
command = f"{setname} {gaussian} {filename}"
return command
def _compute_wind(self, vort, div):
"""Compute wind from vorticity and divergence (CDO command)."""
return f"-chname,u,ua,v,va -dv2uvl -merge [ {vort} {div} ]"
[docs]
def gfile(self, date, path=None, expid=None, filename=None, add_soil=False):
"""Create an ERA5 gfile NetCDF containing required variables.
Converts ERA5 GRIB inputs to a regular Gaussian grid and merges all
required variables into a single file for REMO preprocessing.
Parameters
----------
date : str or datetime-like
Date (ISO 8601) for which variables should be converted.
path : str, optional
Output directory for the gfile.
expid : str, optional
Experiment id for filename templating. Defaults to ``"000000"``.
filename : str, optional
Full output filename. If omitted, it is composed from ``path`` and
``expid``.
add_soil : bool, optional
If True, include additional soil variables.
Returns
-------
str
The output filename.
"""
if expid is None:
expid = "000000"
if filename is None:
filename = get_output_filename(date, expid, path)
variables = self.dynamic + self.fx + self.wind
if add_soil is True:
variables += self.soil_vars
print(f"output filename: {filename}")
# gridfile = "/work/ch0636/g300046/remo/era5-cmor/notebooks/grid.txt"
print("getting files...")
files = self._get_files(date, variables=variables)
pprint(f"using files: \n{files}")
print("getting gridtypes...")
gridtypes = self._get_gridtypes(files)
print("selecting dates...")
seldates = self._seldates(files, date)
print("convert to regular grid...")
regulars = self._to_regulars(seldates, gridtypes)
print("computing wind...")
wind = self._compute_wind(seldates["svo"], seldates["sd"])
merge = (
f"-setgrid,{self.gridfile} -merge [ "
+ " ".join(list(regulars.values()) + [wind])
+ " ]"
)
call = f"cdo {self.options} invertlev -invertlat {merge} {filename}"
print(f"execute: {call}")
subprocess.run(
call.split(),
check=True,
shell=False,
)
# stdout, stderr = process.communicate()
return filename
[docs]
def get_soil(self, date, filename=None):
"""Create a NetCDF with ERA5 soil-related variables.
Parameters
----------
date : str or datetime-like
Target date.
filename : str, optional
Output filename. Defaults to ``$SCRATCH/era5_soil_data.nc``.
Returns
-------
str
The output filename.
"""
if not isinstance(date, str):
date = date.strftime("%Y-%m-%dT%H:%M:%S")
if filename is None:
filename = op.join(self.scratch, "era5_soil_data.nc")
files = self._get_files(date, self.soil_vars + self.fx)
gridtypes = self._get_gridtypes(files)
seldates = self._seldates(files, date)
regulars = self._to_regulars(seldates, gridtypes)
merge = (
f"-setgrid,{self.gridfile} -merge [ "
+ " ".join(list(regulars.values()))
+ " ]"
)
call = f"cdo {self.options} invertlat {merge} {filename}"
print(f"execute: {call}")
subprocess.run(
call.split(),
check=True,
shell=False,
)
return filename
def era5_from_gcloud(time=None, chunks="auto", hfreq=6):
"""
Create ERA5 input for preprocessing from Google Cloud.
Parameters
----------
time : str or datetime-like, optional
The time range to select. Defaults to None.
chunks : str or dict, optional
The chunk size for Dask. Defaults to "auto".
hfreq : int, optional
The hourly frequency to select. Defaults to 6.
Returns
-------
xarray.Dataset
The selected ERA5 dataset.
"""
# era5_to_cmip_cf = {
# "temperature": "ta",
# "surface_pressure": "ps",
# "u_component_of_wind": "ua",
# "v_component_of_wind": "va",
# "specific_humidity": "hus",
# "specific_cloud_liquid_water_content": "clw",
# "geopotential_at_surface": "orog",
# "land_sea_mask": "sftlf",
# "sea_ice_cover": "sic",
# "snow_depth": "snd",
# "sea_surface_temperature": "tos",
# }
era5_to_cmip_cf = {v["gc_name"]: k for k, v in era5_params.items()}
if chunks == "auto":
chunks = {"time": 48}
ds = xr.open_zarr(
"gs://gcp-public-data-arco-era5/ar/full_37-1h-0p25deg-chunk-1.zarr-v3",
chunks=chunks,
storage_options=dict(token="anon"),
consolidated=True,
)
ar_full_37_1h = ds.sel(
time=slice(ds.attrs["valid_time_start"], ds.attrs["valid_time_stop"], hfreq)
)
ds = xr.open_zarr(
"gs://gcp-public-data-arco-era5/ar/model-level-1h-0p25deg.zarr-v1",
chunks=chunks,
storage_options=dict(token="anon"),
consolidated=True,
)
ar_native_vertical_grid_data = ds.sel(
time=slice(ds.attrs["valid_time_start"], ds.attrs["valid_time_stop"], hfreq)
)
if time:
ar_full_37_1h = ar_full_37_1h.sel(time=time)
ar_native_vertical_grid_data = ar_native_vertical_grid_data.sel(time=time)
level_vars = [
v for v in era5_to_cmip_cf.keys() if v in ar_native_vertical_grid_data.data_vars
]
surface_vars = [v for v in era5_to_cmip_cf.keys() if v in ar_full_37_1h.data_vars]
ds = xr.merge(
[
ar_native_vertical_grid_data[level_vars],
ar_full_37_1h[surface_vars].drop_dims("level"),
]
)
ds = ds.rename(era5_to_cmip_cf)
# if freq:
# ds = ds.isel(time=slice(0, None, freq))
for v in ["orog", "sftlf"]:
ds[v] = ds[v].isel(time=0).squeeze(drop=True)
pv = ds.ta.GRIB_pv
n = len(pv)
hyai, hybi = pv[0 : n // 2], pv[n // 2 : n]
ds["akgm"] = xr.DataArray(hyai, dims="nhyi")
ds["bkgm"] = xr.DataArray(hybi, dims="nhyi")
ds = ds.rename(hybrid="lev", latitude="lat", longitude="lon").cf.guess_coord_axis(
verbose=True
)
ds.lev.attrs["positive"] = "down"
# invert lat
ds = ds.reindex(lat=ds.lat[::-1])
return ds
def era5_gfile_from_dkrz(date, path, expid="000000", add_soil=False):
"""
Compute the gfile for a given date.
Parameters
----------
date : datetime-like
The date for which to compute the gfile.
path : str
The path to save the gfile.
expid : str
The experiment ID.
Returns
-------
str
The path to the computed gfile.
"""
params = copy.deepcopy(era5_params)
if date.year in range(2000, 2007):
for k, v in params.items():
# if v["level_type"] == "model_level":
v["era_id"] = "E1"
era5 = ERA5(params, gridfile=era5_grid_file)
return era5.gfile(
date.strftime("%Y-%m-%dT%H:%M:%S"), path, expid, add_soil=add_soil
)