Regrid to regular#

This script shows how to regrid ERA5 data from a gaussian reduced grid to regular lon-lat grid with a linear approach.

import intake
import dask
import logging
from distributed import Client
import xarray as xr
#client.close()
#client=Client(silence_logs=logging.ERROR)

User parameters:#

  • catalog: The one from git is always reachable but uses simplecache which can cause problems with async

  • catalog_entry: One of the era5 time series available in list(cat)

  • target_global_vars: A list of variables that is defined for 1280 longitudes at the equator and should be interpolated

  • openchunks: A chunk setting for all dimension that are not related to lat and lon. Larger values mean less chunks but need more memory

  • to_load_selection: The selection of the time series for which the workflow is applied

catalog="https://gitlab.dkrz.de/data-infrastructure-services/era5-kerchunks/-/raw/main/main.yaml"
#catalog="/work/bm1344/DKRZ/git/era5-kerchunks/main.yaml"
catalog_entry="surface_analysis_daily"
target_global_vars=["2t"]
openchunks=dict(
    time=4,
    #level=5 #for 3d data
)
to_load_selection=dict(
    time="2010"
)

Open catalog and load data for the template for dask functions:

dask.config.set({'logging.distributed': 'error'})
cat=intake.open_catalog(catalog)
dssource=cat[catalog_entry](chunks=openchunks).to_dask()
template_source=dssource[target_global_vars].isel(**{a:0 for a in openchunks.keys()}).load()
/root/micromamba/envs/dkrzcatalog/lib/python3.12/site-packages/intake_xarray/base.py:21: FutureWarning: The return type of `Dataset.dims` will be changed to return a set of dimension names in future, in order to be more consistent with `DataArray.dims`. To access a mapping from dimension names to lengths, please use `Dataset.sizes`.
  'dims': dict(self._ds.dims),
---------------------------------------------------------------------------
FileNotFoundError                         Traceback (most recent call last)
File ~/micromamba/envs/dkrzcatalog/lib/python3.12/site-packages/fsspec/spec.py:833, in AbstractFileSystem.cat_ranges(self, paths, starts, ends, max_gap, on_error, **kwargs)
    832 try:
--> 833     out.append(self.cat_file(p, s, e))
    834 except Exception as e:

File ~/micromamba/envs/dkrzcatalog/lib/python3.12/site-packages/fsspec/spec.py:768, in AbstractFileSystem.cat_file(self, path, start, end, **kwargs)
    767 # explicitly set buffering off?
--> 768 with self.open(path, "rb", **kwargs) as f:
    769     if start is not None:

File ~/micromamba/envs/dkrzcatalog/lib/python3.12/site-packages/fsspec/spec.py:1298, in AbstractFileSystem.open(self, path, mode, block_size, cache_options, compression, **kwargs)
   1297 ac = kwargs.pop("autocommit", not self._intrans)
-> 1298 f = self._open(
   1299     path,
   1300     mode=mode,
   1301     block_size=block_size,
   1302     autocommit=ac,
   1303     cache_options=cache_options,
   1304     **kwargs,
   1305 )
   1306 if compression is not None:

File ~/micromamba/envs/dkrzcatalog/lib/python3.12/site-packages/fsspec/implementations/local.py:191, in LocalFileSystem._open(self, path, mode, block_size, **kwargs)
    190     self.makedirs(self._parent(path), exist_ok=True)
--> 191 return LocalFileOpener(path, mode, fs=self, **kwargs)

File ~/micromamba/envs/dkrzcatalog/lib/python3.12/site-packages/fsspec/implementations/local.py:355, in LocalFileOpener.__init__(self, path, mode, autocommit, fs, compression, **kwargs)
    354 self.blocksize = io.DEFAULT_BUFFER_SIZE
--> 355 self._open()

File ~/micromamba/envs/dkrzcatalog/lib/python3.12/site-packages/fsspec/implementations/local.py:360, in LocalFileOpener._open(self)
    359 if self.autocommit or "w" not in self.mode:
--> 360     self.f = open(self.path, mode=self.mode)
    361     if self.compression:

FileNotFoundError: [Errno 2] No such file or directory: '/pool/data/ERA5/E5/sf/an/1D/167/E5sf00_1D_1940-01_167.grb'

The above exception was the direct cause of the following exception:

ReferenceNotReachable                     Traceback (most recent call last)
Cell In[3], line 4
      2 cat=intake.open_catalog(catalog)
      3 dssource=cat[catalog_entry](chunks=openchunks).to_dask()
----> 4 template_source=dssource[target_global_vars].isel(**{a:0 for a in openchunks.keys()}).load()

File ~/micromamba/envs/dkrzcatalog/lib/python3.12/site-packages/xarray/core/dataset.py:862, in Dataset.load(self, **kwargs)
    859 chunkmanager = get_chunked_array_type(*lazy_data.values())
    861 # evaluate all the chunked arrays simultaneously
--> 862 evaluated_data: tuple[np.ndarray[Any, Any], ...] = chunkmanager.compute(
    863     *lazy_data.values(), **kwargs
    864 )
    866 for k, data in zip(lazy_data, evaluated_data):
    867     self.variables[k].data = data

File ~/micromamba/envs/dkrzcatalog/lib/python3.12/site-packages/xarray/namedarray/daskmanager.py:86, in DaskManager.compute(self, *data, **kwargs)
     81 def compute(
     82     self, *data: Any, **kwargs: Any
     83 ) -> tuple[np.ndarray[Any, _DType_co], ...]:
     84     from dask.array import compute
---> 86     return compute(*data, **kwargs)

File ~/micromamba/envs/dkrzcatalog/lib/python3.12/site-packages/dask/base.py:661, in compute(traverse, optimize_graph, scheduler, get, *args, **kwargs)
    658     postcomputes.append(x.__dask_postcompute__())
    660 with shorten_traceback():
--> 661     results = schedule(dsk, keys, **kwargs)
    663 return repack([f(r, *a) for r, (f, a) in zip(results, postcomputes)])

File ~/micromamba/envs/dkrzcatalog/lib/python3.12/site-packages/xarray/core/indexing.py:572, in ImplicitToExplicitIndexingAdapter.__array__(self, dtype)
    571 def __array__(self, dtype: np.typing.DTypeLike = None) -> np.ndarray:
--> 572     return np.asarray(self.get_duck_array(), dtype=dtype)

File ~/micromamba/envs/dkrzcatalog/lib/python3.12/site-packages/xarray/core/indexing.py:575, in ImplicitToExplicitIndexingAdapter.get_duck_array(self)
    574 def get_duck_array(self):
--> 575     return self.array.get_duck_array()

File ~/micromamba/envs/dkrzcatalog/lib/python3.12/site-packages/xarray/core/indexing.py:784, in CopyOnWriteArray.get_duck_array(self)
    783 def get_duck_array(self):
--> 784     return self.array.get_duck_array()

File ~/micromamba/envs/dkrzcatalog/lib/python3.12/site-packages/xarray/core/indexing.py:654, in LazilyIndexedArray.get_duck_array(self)
    649 # self.array[self.key] is now a numpy array when
    650 # self.array is a BackendArray subclass
    651 # and self.key is BasicIndexer((slice(None, None, None),))
    652 # so we need the explicit check for ExplicitlyIndexed
    653 if isinstance(array, ExplicitlyIndexed):
--> 654     array = array.get_duck_array()
    655 return _wrap_numpy_scalars(array)

File ~/micromamba/envs/dkrzcatalog/lib/python3.12/site-packages/xarray/coding/variables.py:81, in _ElementwiseFunctionArray.get_duck_array(self)
     80 def get_duck_array(self):
---> 81     return self.func(self.array.get_duck_array())

File ~/micromamba/envs/dkrzcatalog/lib/python3.12/site-packages/xarray/core/indexing.py:647, in LazilyIndexedArray.get_duck_array(self)
    643     array = apply_indexer(self.array, self.key)
    644 else:
    645     # If the array is not an ExplicitlyIndexedNDArrayMixin,
    646     # it may wrap a BackendArray so use its __getitem__
--> 647     array = self.array[self.key]
    649 # self.array[self.key] is now a numpy array when
    650 # self.array is a BackendArray subclass
    651 # and self.key is BasicIndexer((slice(None, None, None),))
    652 # so we need the explicit check for ExplicitlyIndexed
    653 if isinstance(array, ExplicitlyIndexed):

File ~/micromamba/envs/dkrzcatalog/lib/python3.12/site-packages/xarray/backends/zarr.py:105, in ZarrArrayWrapper.__getitem__(self, key)
    103 elif isinstance(key, indexing.OuterIndexer):
    104     method = self._oindex
--> 105 return indexing.explicit_indexing_adapter(
    106     key, array.shape, indexing.IndexingSupport.VECTORIZED, method
    107 )

File ~/micromamba/envs/dkrzcatalog/lib/python3.12/site-packages/xarray/core/indexing.py:1011, in explicit_indexing_adapter(key, shape, indexing_support, raw_indexing_method)
    989 """Support explicit indexing by delegating to a raw indexing method.
    990 
    991 Outer and/or vectorized indexers are supported by indexing a second time
   (...)
   1008 Indexing result, in the form of a duck numpy-array.
   1009 """
   1010 raw_key, numpy_indices = decompose_indexer(key, shape, indexing_support)
-> 1011 result = raw_indexing_method(raw_key.tuple)
   1012 if numpy_indices.tuple:
   1013     # index the loaded np.ndarray
   1014     indexable = NumpyIndexingAdapter(result)

File ~/micromamba/envs/dkrzcatalog/lib/python3.12/site-packages/xarray/backends/zarr.py:95, in ZarrArrayWrapper._getitem(self, key)
     94 def _getitem(self, key):
---> 95     return self._array[key]

File ~/micromamba/envs/dkrzcatalog/lib/python3.12/site-packages/zarr/core.py:798, in Array.__getitem__(self, selection)
    796     result = self.vindex[selection]
    797 elif is_pure_orthogonal_indexing(pure_selection, self.ndim):
--> 798     result = self.get_orthogonal_selection(pure_selection, fields=fields)
    799 else:
    800     result = self.get_basic_selection(pure_selection, fields=fields)

File ~/micromamba/envs/dkrzcatalog/lib/python3.12/site-packages/zarr/core.py:1080, in Array.get_orthogonal_selection(self, selection, out, fields)
   1077 # setup indexer
   1078 indexer = OrthogonalIndexer(selection, self)
-> 1080 return self._get_selection(indexer=indexer, out=out, fields=fields)

File ~/micromamba/envs/dkrzcatalog/lib/python3.12/site-packages/zarr/core.py:1343, in Array._get_selection(self, indexer, out, fields)
   1340 if math.prod(out_shape) > 0:
   1341     # allow storage to get multiple items at once
   1342     lchunk_coords, lchunk_selection, lout_selection = zip(*indexer)
-> 1343     self._chunk_getitems(
   1344         lchunk_coords,
   1345         lchunk_selection,
   1346         out,
   1347         lout_selection,
   1348         drop_axes=indexer.drop_axes,
   1349         fields=fields,
   1350     )
   1351 if out.shape:
   1352     return out

File ~/micromamba/envs/dkrzcatalog/lib/python3.12/site-packages/zarr/core.py:2179, in Array._chunk_getitems(self, lchunk_coords, lchunk_selection, out, lout_selection, drop_axes, fields)
   2177     if not isinstance(self._meta_array, np.ndarray):
   2178         contexts = ConstantMap(ckeys, constant=Context(meta_array=self._meta_array))
-> 2179     cdatas = self.chunk_store.getitems(ckeys, contexts=contexts)
   2181 for ckey, chunk_select, out_select in zip(ckeys, lchunk_selection, lout_selection):
   2182     if ckey in cdatas:

File ~/micromamba/envs/dkrzcatalog/lib/python3.12/site-packages/zarr/storage.py:1435, in FSStore.getitems(self, keys, contexts)
   1432     continue
   1433 elif isinstance(v, Exception):
   1434     # Raise any other exception
-> 1435     raise v
   1436 else:
   1437     # The function calling this method may not recognize the transformed
   1438     # keys, so we send the values returned by self.map.getitems back into
   1439     # the original key space.
   1440     results[keys_transformed[k]] = v

ReferenceNotReachable: Reference "2t/0.0" failed to fetch target ['/pool/data/ERA5/E5/sf/an/1D/167/E5sf00_1D_1940-01_167.grb', 0, 1085524]
  1. Unstack: Define function and template

  • Select equator lons for interpolation

  • Chunk entire record (lonxlat)

def unstack(ds):
    return ds.rename({'value':'latlon'}).set_index(latlon=("lat","lon")).unstack("latlon")

template_unstack=unstack(template_source)
equator_lons=template_unstack[target_global_vars].sel(lat=0.0,method="nearest").dropna(dim="lon")["lon"]
latlonchunks={
    a:len(template_unstack[a])
    for a in template_unstack.dims
}

nolatlonchunks={
    a:dssource[target_global_vars].chunksizes[a]
    for a in openchunks.keys()
}
template_unstack=template_unstack.chunk(**latlonchunks)
  1. Interp: Interpolate all nans linearly and select only next to equator longitudes.

def interp(ds):
    #reindexed_block=ds.dropna(dim="lon").reindex(lon=xr.concat([ds["lon"],equator_lons],"lon")["lon"]).sortby("lon").drop_duplicates('lon')
    #interped=ds.interpolate_na(dim="lon",method="linear",period=360.0)
    #interped=dask.optimize(interped)[0]
    #return ds.groupby("lat").apply(
    #    lambda dslat: dslat.dropna(dim="lon").interp(lon=equator_lons.values,method="linear",kwargs={"fill_value": "extrapolate"})
    #)
    return ds.interpolate_na(dim="lon",method="linear",period=360.0).reindex(lon=equator_lons)

template_interp=interp(template_unstack)
template_unstack=template_unstack.expand_dims(**{a:dssource[a] for a in openchunks.keys()}).chunk(nolatlonchunks)
template_interp=template_interp.expand_dims(**{a:dssource[a] for a in openchunks.keys()}).chunk(nolatlonchunks)

Define workflow here#

original=dssource[target_global_vars].sel(**to_load_selection)
#unstacked=dssource[target_global_vars].map_blocks(unstack,template=template_unstack[target_global_vars])
#dataset:
unstacked=original.map_blocks(unstack,template=template_unstack[target_global_vars].sel(time="2010"))
#dataarray:
#unstacked=original.map_blocks(unstack,template=template_unstack.sel(time="2010"))
#unstacked=dssource[target_global_vars].sel(time="2010").map_blocks(unstack,template=template_unstack[target_global_vars]).chunk(lat=1)
#unstacked=dssource[target_global_vars].sel(time="2010").map_blocks(unstack,template=template_unstack.sel(time="2010"))
#interped=unstacked.map_blocks(interp,template=template_interp[target_global_vars])
interped=unstacked.map_blocks(interp,template=template_interp.sel(time="2010"))
#interped=unstacked.map_blocks(interp,template=template_interp.sel(time="2010"))
interped=dask.optimize(interped)[0]
interped

Run workflow here#

from dask.diagnostics import ProgressBar
with ProgressBar():
    t2=interped.compute()
import hvplot.xarray
t2.hvplot.image(x="lon",y="lat")