# Copyright 2026 IPSL / CNRS / Sorbonne University
# Authors: Kishanthan Kingston
#
# This work is licensed under the Creative Commons
# Attribution-NonCommercial-ShareAlike 4.0 International License.
# To view a copy of this license, visit
# http://creativecommons.org/licenses/by-nc-sa/4.0/
import os
import argparse
import pandas as pd
import xarray as xr
# from tqdm import tqdm
import time
from IPSL_AID.logger import Logger
# python generate_all_data_ERA5.py --year_start 2015 --year_end 2015 --variable 2m_temperature --rename_var VAR_2T
[docs]
def parse_args():
"""
Parse command-line arguments.
Returns
-------
argparse.Namespace
Parsed command line arguments as a namespace object with attributes
corresponding to each argument.
Raises
------
ValueError
If the number of variables does not match the number of
rename variables.
"""
# Parse command-line arguments
parser = argparse.ArgumentParser(
description="Generate ERA5 samples from monthly NetCDF files using CSV timestamps"
)
parser.add_argument(
"--year_start", type=int, required=True, help="First year to process (2015)."
)
parser.add_argument(
"--year_end", type=int, required=True, help="Last year to process (inclusive)."
)
parser.add_argument(
"--variable",
type=str,
nargs="+",
required=True,
help=(
"ERA5 variable names corresponding to downloaded monthly data."
"Example: 2m_temperature 10m_u_component_of_wind"
),
)
parser.add_argument(
"--pressure_level",
type=str,
nargs="+",
required=False,
help="Pressure levels to extract (500 750 850)",
)
parser.add_argument(
"--rename_var",
type=str,
nargs="+",
required=True,
help=(
"New variable name(s) to use in the output NetCDF file(s)."
"Must match the number and order of --variable. "
"Example: VAR_2T VAR_10U"
),
)
return parser.parse_args()
[docs]
def main(logger):
"""
Generate yearly ERA5 datasets from monthly NetCDF files.
The function follows a structured workflow:
1. Parse command-line arguments.
2. Load a CSV file containing timestamps to extract.
3. Loop over years and variables.
4. Open monthly ERA5 NetCDF files using xarray.
5. Extract requested timestamps.
6. Concatenate monthly subsets into yearly datasets.
7. Rename variables and write compressed NetCDF files.
Notes
-----
ERA5 data is stored monthly, so timestamps are grouped by month.
"""
args = parse_args()
year_start = args.year_start
year_end = args.year_end
variables = args.variable
pressure_levels = args.pressure_level
rename_vars = args.rename_var
if len(variables) != len(rename_vars):
raise ValueError("Number of --variable must match number of --rename_var")
rename_dict = dict(zip(variables, rename_vars))
# Paths
# CSV containing timestamps to extract
csv_path = (
"/leonardo_work/EUHPC_D27_095/kkingston/IPSL-AID/data/dates_hours_1980_2022.csv"
)
# Root directory containing downloaded monthly ERA5 data
base_data_root = "/leonardo_work/EUHPC_D27_095/kkingston/IPSL-AID/data"
# Root directory where extracted yearly samples will be saved
base_output_root = "/leonardo_work/EUHPC_D27_095/kkingston/IPSL-AID/data"
# Load CSV
logger.info("Loading CSV file")
df = pd.read_csv(csv_path)
# Combine date and hour columns into a full datetime column
df["datetime"] = pd.to_datetime(df["date"] + " " + df["hour"])
logger.info(f"Total timestamps: {len(df)}")
# Main loop
for year in range(year_start, year_end + 1):
logger.start_task(
"Processing year", description="Generating ERA5 samples", year=year
)
# Filter timestamps for current year
df_year = df[df["datetime"].dt.year == year]
if df_year.empty:
logger.info("No timestamps found.")
continue
# Group timestamps by (year, month)
# ERA5 data is stored monthly
grouped_month = df_year.groupby(
[df_year["datetime"].dt.year, df_year["datetime"].dt.month]
)
logger.info(f"Months with data: {len(grouped_month)}")
for variable in variables:
logger.step("Variable", f"Processing {variable}")
# Directory containing monthly ERA5 files for this variable
variable_root = os.path.join(base_data_root, f"data_{variable.upper()}")
# Output directory for this variable
output_root = os.path.join(
base_output_root, f"data_FOURxDaily_{variable.upper()}"
)
os.makedirs(output_root, exist_ok=True)
yearly_datasets = []
# Loop over months
for idx, ((yyyy, month), group) in enumerate(grouped_month):
logger.info(
f"\n[{idx+1}/{len(grouped_month)}] Processing month {yyyy}-{month:02d}"
)
mm = f"{month:02d}"
# Construct monthly file path
if pressure_levels:
level_str = "_".join(pressure_levels)
monthly_file = os.path.join(
variable_root,
str(yyyy),
f"{variable}_{level_str}_{yyyy}{mm}.nc",
)
else:
monthly_file = os.path.join(
variable_root, str(yyyy), f"{variable}_{yyyy}{mm}.nc"
)
if not os.path.exists(monthly_file):
logger.warning(f"Missing file: {monthly_file}")
continue
logger.info(f"Opening file: {monthly_file}")
t0 = time.time()
ds = xr.open_dataset(monthly_file)
ds = ds.drop_vars(["number", "expver"], errors="ignore")
if pressure_levels and "pressure_level" in ds.dims:
ds = ds.sel(pressure_level=[int(p) for p in pressure_levels])
logger.info(f"Opened in {time.time() - t0:.2f}s")
# Some ERA5 datasets use "valid_time" instead of "time"
if "valid_time" in ds.dims:
ds = ds.rename({"valid_time": "time"})
if "time" not in ds.dims:
raise RuntimeError(f"No time dimension in {monthly_file}")
logger.info(f"Dataset dims: {ds.dims}")
# Extract only timestamps requested in the CSV
requested_times = group["datetime"].values
logger.info(f"Selecting {len(requested_times)} timestamps...")
t0 = time.time()
ds_sel = ds.sel(time=requested_times)
logger.info(f"Selection done in {time.time() - t0:.2f}s")
logger.info(f"Selected timesteps: {ds_sel.time.size}")
if ds_sel.time.size > 0:
yearly_datasets.append(ds_sel)
# Progressive concatenation
if not yearly_datasets:
logger.info(f"No valid data for {variable} in {year}")
continue
logger.info("\nStarting progressive concatenation...")
logger.info(f"Number of monthly subsets: {len(yearly_datasets)}")
t0 = time.time()
ds_year = yearly_datasets[0]
logger.info(f"Initial dataset dims: {ds_year.dims}")
for i in range(1, len(yearly_datasets)):
logger.info(f"Concatenating subset {i+1}/{len(yearly_datasets)}...")
ds_year = xr.concat([ds_year, yearly_datasets[i]], dim="time")
logger.info(f"Concatenation finished in {time.time() - t0:.2f}s")
logger.info("Sorting by time...")
ds_year = ds_year.sortby("time")
logger.info(f"Final dataset dims: {ds_year.dims}")
# Rename variable
new_name = rename_dict[variable]
original_var = list(ds_year.data_vars)[0]
ds_year = ds_year.rename({original_var: new_name})
logger.info(f"Renamed {original_var} → {new_name}")
output_file = os.path.join(output_root, f"samples_{year}.nc")
logger.info(f"\nWriting NetCDF: {output_file}")
t0 = time.time()
encoding = {new_name: {"zlib": True, "complevel": 4, "dtype": "float32"}}
ds_year.to_netcdf(output_file, encoding=encoding)
logger.info(f"Write completed in {time.time() - t0:.2f}s")
logger.info(f"Total timesteps: {len(ds_year.time)}")
logger.info("\nAll requested years processed successfully.")
if __name__ == "__main__":
logger = Logger(console_output=True)
logger.show_header("ERA5 Dataset Generator")
main(logger)