Harvest#
Use meta data from catalog datasets to create a meta data base.
from copy import deepcopy
import intake
import yaml
import pandas as pd
import datetime
import json
import fsspec as fs
from copy import deepcopy
def get_sources(catalog,name=None):
newname='.'.join(
[ a
for a in [name, catalog.name]
if a
]
)
data_sources = []
for key, entry in catalog.items():
if key=="csv" or key=="esm-json":
continue
elif isinstance(entry, intake.catalog.Catalog):
if newname == "main":
newname = None
# If the entry is a subcatalog, recursively search it
data_sources.extend(get_sources(entry, newname))
elif isinstance(entry, intake.source.base.DataSource):
data_sources.append(newname+"."+key)
return data_sources
cat=intake.open_catalog("../../main.yaml")
sources=get_sources(cat)
list(set([a.split('.')[1] for a in sources]))
['surface_analysis_monthly',
'pressure-level_analysis_hourly',
'pressure-level_analysis_monthly',
'surface_analysis_daily',
'surface_forecast_hourly',
'pressure-level_analysis_daily',
'surface_forecast_monthly',
'surface_analysis_hourly']
drs="levelType_dataType_frequency"
catraw=yaml.safe_load(cat.text)
def map_drs(olds):
s=olds.split('.')[1]
#description=cat.describe()
description=catraw["sources"][s]
metadata=description["metadata"]
metadata_keys=["format","grid_id","member_id","institution_id","institution","references","simulation_id","variables","variable-long_names"]
filtered={k:metadata[k] for k in metadata_keys if k in metadata.keys()}
mapped=dict()
mapped["format"]="netcdf"
if filtered:
mapped.update(filtered)
if not "variables" in filtered:
# if "user_parameters" in description:
if "parameters" in description:
mapped["variables"]=description["parameters"]["variables"]["allowed"]
# for paramdict in description["user_parameters"]:
# if paramdict["name"]=="variables":
# print(s)
# mapped["variables"]=paramdict["allowed"]
for idx,k in enumerate(drs.split('_')):
mapped[k]=s.split('_')[idx]
if "aggregation" in mapped:
if mapped["aggregation"].endswith('0'):
mapped["aggregation"]+=s.split('.')[-1]
urlpath=description["args"]["urlpath"]
if type(urlpath)==list:
urlpath=urlpath[0]
if urlpath.startswith("reference"):
mapped["format"]="zarr"
elif urlpath.endswith(".grib") :
mapped["format"]="grib"
mapped["urlpath"]=description["args"]["urlpath"]
return mapped
df=pd.DataFrame(list(map(map_drs,sources)))
df
format | grid_id | institution_id | institution | references | simulation_id | variable-long_names | variables | levelType | dataType | frequency | urlpath | |
---|---|---|---|---|---|---|---|---|---|---|---|---|
0 | zarr | N320 | ECMWF-DKRZ | Data from European Centre for Medium-Range Wea... | Hersbach, H., Bell, B., Berrisford, P., Hiraha... | ERA5 | [Fraction of cloud cover, Specific cloud ice w... | [cc, ciwc, clwc, crwc, cswc, d, o3, pv, q, r, ... | pressure-level | analysis | daily | reference::{{CATALOG_DIR}}/kerchunks/file/parq... |
1 | zarr | N320 | ECMWF-DKRZ | Data from European Centre for Medium-Range Wea... | Hersbach, H., Bell, B., Berrisford, P., Hiraha... | ERA5 | [Fraction of cloud cover, Specific cloud ice w... | [cc, ciwc, clwc, crwc, cswc, d, o3, pv, q, r, ... | pressure-level | analysis | hourly | [reference::{{CATALOG_DIR}}/kerchunks/file/par... |
2 | zarr | N320 | ECMWF-DKRZ | Data from European Centre for Medium-Range Wea... | Hersbach, H., Bell, B., Berrisford, P., Hiraha... | ERA5 | [Fraction of cloud cover, Specific cloud ice w... | [cc, ciwc, clwc, crwc, cswc, d, o3, pv, q, r, ... | pressure-level | analysis | monthly | reference::{{CATALOG_DIR}}/kerchunks/file/parq... |
3 | zarr | N320 | ECMWF-DKRZ | Data from European Centre for Medium-Range Wea... | Hersbach, H., Bell, B., Berrisford, P., Hiraha... | ERA5 | [100 metre U wind component, 100 metre V wind ... | [100u, 100v, 10u, 10v, 2d, 2t, asn, ci, fal, f... | surface | analysis | daily | reference::{{CATALOG_DIR}}/kerchunks/file/parq... |
4 | zarr | N320 | ECMWF-DKRZ | Data from European Centre for Medium-Range Wea... | Hersbach, H., Bell, B., Berrisford, P., Hiraha... | ERA5 | [100 metre U wind component, 100 metre V wind ... | [100u, 100v, 10u, 10v, 2d, 2t, asn, ci, fal, f... | surface | analysis | hourly | reference::{{CATALOG_DIR}}/kerchunks/file/parq... |
5 | zarr | N320 | ECMWF-DKRZ | Data from European Centre for Medium-Range Wea... | Hersbach, H., Bell, B., Berrisford, P., Hiraha... | ERA5 | [100 metre U wind component, 100 metre V wind ... | [100u, 100v, 10si, 10u, 10v, 2d, 2t, asn, ci, ... | surface | analysis | monthly | reference::{{CATALOG_DIR}}/kerchunks/file/parq... |
6 | zarr | N320 | ECMWF-DKRZ | Data from European Centre for Medium-Range Wea... | Hersbach, H., Bell, B., Berrisford, P., Hiraha... | ERA5 | [10 metre wind gust since previous post-proces... | [10fg, bld, blh, cape, cbh, cdir, cp, crr, csf... | surface | forecast | hourly | reference::{{CATALOG_DIR}}/kerchunks/file/parq... |
7 | zarr | N320 | ECMWF-DKRZ | Data from European Centre for Medium-Range Wea... | Hersbach, H., Bell, B., Berrisford, P., Hiraha... | ERA5 | [10 metre wind gust since previous post-proces... | [10fg, bld, blh, cape, cbh, cdir, cp, crr, csf... | surface | forecast | monthly | reference::{{CATALOG_DIR}}/kerchunks/file/parq... |
#df["variables"]=df["variables"].astype(str)
#df["variable-long_names"]=df["variable-long_names"].astype(str)
#df["urlpath"]=df["urlpath"].astype(str)
df.to_csv("dkrz_era5_disk.csv.gz", index=False)
template=json.load(fs.open("https://raw.githubusercontent.com/eerie-project/intake_catalogues/main/jasmin/jasmin-catalogue.json").open())
descriptive_json=deepcopy(template)
descriptive_json["assets"]['column_name']='urlpath'
del descriptive_json["assets"]['format']
descriptive_json["assets"]['format_column_name']='format'
descriptive_json["attributes"]=[
{
'column_name': a, 'vocabulary': ''
}
for a in df.columns
]
descriptive_json["aggregation_control"]["variable_column_name"]="variables"
#descriptive_json["aggregation_control"]["groupby_attrs"]=drs.split('.')
descriptive_json["aggregation_control"]["groupby_attrs"]=drs.split('_')
descriptive_json["aggregation_control"]["aggregations"]=[
{
'type': 'union',
'attribute_name': 'variables',
'options': {}
}
]
descriptive_json["id"]="dkrz-catalogue"
descriptive_json["last_updated"]=datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
del descriptive_json["catalog_file"]
#save this in github:
with open("dkrz_era5_disk.json","w") as f:
f.write(json.dumps(
descriptive_json,
sort_keys=True,
indent=4,
separators=(',', ': ')
))
esmcat=intake.open_esm_datastore(
obj=dict(
esmcat=descriptive_json,
df=df
),
columns_with_iterables=["variables","variable-long_names"]
)
/root/micromamba/envs/dkrzcatalog/lib/python3.12/site-packages/fastprogress/fastprogress.py:107: UserWarning: Couldn't import ipywidgets properly, progress bar will use console behavior
warn("Couldn't import ipywidgets properly, progress bar will use console behavior")
esmcat
dkrz-catalogue catalog with 8 dataset(s) from 8 asset(s):
unique | |
---|---|
format | 1 |
grid_id | 1 |
institution_id | 1 |
institution | 1 |
references | 1 |
simulation_id | 1 |
variable-long_names | 110 |
variables | 110 |
levelType | 2 |
dataType | 2 |
frequency | 3 |
urlpath | 50 |
derived_variables | 0 |