Harvest

Harvest#

Use meta data from catalog datasets to create a meta data base.

from copy import deepcopy
import intake
import yaml
import pandas as pd
import datetime
import json
import fsspec as fs
from copy import deepcopy
def get_sources(catalog,name=None):
    newname='.'.join(
        [ a 
         for a in [name, catalog.name]
         if a
        ]
    )
    data_sources = []
    
    for key, entry in catalog.items():
        if key=="csv" or key=="esm-json":
            continue
        elif isinstance(entry, intake.catalog.Catalog):
            if newname == "main":
                newname = None
            # If the entry is a subcatalog, recursively search it
            data_sources.extend(get_sources(entry, newname))
        elif isinstance(entry, intake.source.base.DataSource):
            data_sources.append(newname+"."+key)

    return data_sources
cat=intake.open_catalog("../../main.yaml")
sources=get_sources(cat)
list(set([a.split('.')[1] for a in sources]))
['surface_analysis_monthly',
 'pressure-level_analysis_hourly',
 'pressure-level_analysis_monthly',
 'surface_analysis_daily',
 'surface_forecast_hourly',
 'pressure-level_analysis_daily',
 'surface_forecast_monthly',
 'surface_analysis_hourly']
drs="levelType_dataType_frequency"
catraw=yaml.safe_load(cat.text)
def map_drs(olds):
    s=olds.split('.')[1]
    #description=cat.describe()
    description=catraw["sources"][s]
    metadata=description["metadata"]
    metadata_keys=["format","grid_id","member_id","institution_id","institution","references","simulation_id","variables","variable-long_names"]
    filtered={k:metadata[k] for k in metadata_keys if k in metadata.keys()}
    mapped=dict()
    mapped["format"]="netcdf"
    if filtered:
        mapped.update(filtered)
    if not "variables" in filtered:
#        if "user_parameters" in description:
        if "parameters" in description:
            mapped["variables"]=description["parameters"]["variables"]["allowed"]
#            for paramdict in description["user_parameters"]:
#                if paramdict["name"]=="variables":
#                    print(s)
#                    mapped["variables"]=paramdict["allowed"]
    for idx,k in enumerate(drs.split('_')):
        mapped[k]=s.split('_')[idx]
    if "aggregation" in mapped:
        if mapped["aggregation"].endswith('0'):
            mapped["aggregation"]+=s.split('.')[-1]
    urlpath=description["args"]["urlpath"]
    if type(urlpath)==list:
        urlpath=urlpath[0]
    if urlpath.startswith("reference"):
        mapped["format"]="zarr"
    elif urlpath.endswith(".grib") :
        mapped["format"]="grib"
    mapped["urlpath"]=description["args"]["urlpath"]
    return mapped
df=pd.DataFrame(list(map(map_drs,sources)))
df
format grid_id institution_id institution references simulation_id variable-long_names variables levelType dataType frequency urlpath
0 zarr N320 ECMWF-DKRZ Data from European Centre for Medium-Range Wea... Hersbach, H., Bell, B., Berrisford, P., Hiraha... ERA5 [Fraction of cloud cover, Specific cloud ice w... [cc, ciwc, clwc, crwc, cswc, d, o3, pv, q, r, ... pressure-level analysis daily reference::{{CATALOG_DIR}}/kerchunks/file/parq...
1 zarr N320 ECMWF-DKRZ Data from European Centre for Medium-Range Wea... Hersbach, H., Bell, B., Berrisford, P., Hiraha... ERA5 [Fraction of cloud cover, Specific cloud ice w... [cc, ciwc, clwc, crwc, cswc, d, o3, pv, q, r, ... pressure-level analysis hourly [reference::{{CATALOG_DIR}}/kerchunks/file/par...
2 zarr N320 ECMWF-DKRZ Data from European Centre for Medium-Range Wea... Hersbach, H., Bell, B., Berrisford, P., Hiraha... ERA5 [Fraction of cloud cover, Specific cloud ice w... [cc, ciwc, clwc, crwc, cswc, d, o3, pv, q, r, ... pressure-level analysis monthly reference::{{CATALOG_DIR}}/kerchunks/file/parq...
3 zarr N320 ECMWF-DKRZ Data from European Centre for Medium-Range Wea... Hersbach, H., Bell, B., Berrisford, P., Hiraha... ERA5 [100 metre U wind component, 100 metre V wind ... [100u, 100v, 10u, 10v, 2d, 2t, asn, ci, fal, f... surface analysis daily reference::{{CATALOG_DIR}}/kerchunks/file/parq...
4 zarr N320 ECMWF-DKRZ Data from European Centre for Medium-Range Wea... Hersbach, H., Bell, B., Berrisford, P., Hiraha... ERA5 [100 metre U wind component, 100 metre V wind ... [100u, 100v, 10u, 10v, 2d, 2t, asn, ci, fal, f... surface analysis hourly reference::{{CATALOG_DIR}}/kerchunks/file/parq...
5 zarr N320 ECMWF-DKRZ Data from European Centre for Medium-Range Wea... Hersbach, H., Bell, B., Berrisford, P., Hiraha... ERA5 [100 metre U wind component, 100 metre V wind ... [100u, 100v, 10si, 10u, 10v, 2d, 2t, asn, ci, ... surface analysis monthly reference::{{CATALOG_DIR}}/kerchunks/file/parq...
6 zarr N320 ECMWF-DKRZ Data from European Centre for Medium-Range Wea... Hersbach, H., Bell, B., Berrisford, P., Hiraha... ERA5 [10 metre wind gust since previous post-proces... [10fg, bld, blh, cape, cbh, cdir, cp, crr, csf... surface forecast hourly reference::{{CATALOG_DIR}}/kerchunks/file/parq...
7 zarr N320 ECMWF-DKRZ Data from European Centre for Medium-Range Wea... Hersbach, H., Bell, B., Berrisford, P., Hiraha... ERA5 [10 metre wind gust since previous post-proces... [10fg, bld, blh, cape, cbh, cdir, cp, crr, csf... surface forecast monthly reference::{{CATALOG_DIR}}/kerchunks/file/parq...
#df["variables"]=df["variables"].astype(str)
#df["variable-long_names"]=df["variable-long_names"].astype(str)
#df["urlpath"]=df["urlpath"].astype(str)
df.to_csv("dkrz_era5_disk.csv.gz", index=False)
template=json.load(fs.open("https://raw.githubusercontent.com/eerie-project/intake_catalogues/main/jasmin/jasmin-catalogue.json").open())
descriptive_json=deepcopy(template)
descriptive_json["assets"]['column_name']='urlpath'
del descriptive_json["assets"]['format']
descriptive_json["assets"]['format_column_name']='format'
descriptive_json["attributes"]=[
    {
        'column_name': a, 'vocabulary': ''
    }
    for a in df.columns
]
descriptive_json["aggregation_control"]["variable_column_name"]="variables"
#descriptive_json["aggregation_control"]["groupby_attrs"]=drs.split('.')
descriptive_json["aggregation_control"]["groupby_attrs"]=drs.split('_')
descriptive_json["aggregation_control"]["aggregations"]=[
    {
        'type': 'union',
        'attribute_name': 'variables',
        'options': {}
    }
]
descriptive_json["id"]="dkrz-catalogue"
descriptive_json["last_updated"]=datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
del descriptive_json["catalog_file"]
#save this in github:
with open("dkrz_era5_disk.json","w") as f:
    f.write(json.dumps(
    descriptive_json,
    sort_keys=True,
    indent=4,
    separators=(',', ': ')
))
esmcat=intake.open_esm_datastore(
    obj=dict(
        esmcat=descriptive_json,
        df=df
    ),
    columns_with_iterables=["variables","variable-long_names"]
)
/root/micromamba/envs/dkrzcatalog/lib/python3.12/site-packages/fastprogress/fastprogress.py:107: UserWarning: Couldn't import ipywidgets properly, progress bar will use console behavior
  warn("Couldn't import ipywidgets properly, progress bar will use console behavior")
esmcat

dkrz-catalogue catalog with 8 dataset(s) from 8 asset(s):

unique
format 1
grid_id 1
institution_id 1
institution 1
references 1
simulation_id 1
variable-long_names 110
variables 110
levelType 2
dataType 2
frequency 3
urlpath 50
derived_variables 0