"""
SPDX-FileCopyrightText: 2025 Koen van Greevenbroek
SPDX-License-Identifier: GPL-3.0-or-later
"""
from pathlib import Path
from typing import Iterable
import geopandas as gpd
import numpy as np
import pandas as pd
MONTH_LENGTHS = np.array([31, 28, 31, 30, 31, 30, 31, 31, 30, 31, 30, 31], dtype=float)
MONTH_ENDS = np.cumsum(MONTH_LENGTHS)
DAYS_IN_YEAR = float(MONTH_ENDS[-1])
def _month_index_for_day(day: float) -> int:
"""Return month index (0-based) for day in [0, 365)."""
# add tiny epsilon to avoid edge cases at month boundaries
return int(np.searchsorted(MONTH_ENDS, day + 1e-9))
[docs]
def compute_month_overlaps(start_day: float, length_days: float) -> np.ndarray:
"""Return array of day overlaps per month for given season.
start_day is 1-indexed (GAEZ convention). length_days can exceed 365; values
above one year are capped at 365 to avoid infinite wrap.
"""
if not np.isfinite(start_day) or not np.isfinite(length_days):
return np.zeros(12)
if length_days <= 0:
return np.zeros(12)
start = (float(start_day) - 1.0) % DAYS_IN_YEAR
remaining = min(float(length_days), DAYS_IN_YEAR)
overlaps = np.zeros(12)
position = start
while remaining > 1e-6:
if position >= DAYS_IN_YEAR:
position -= DAYS_IN_YEAR
month_idx = _month_index_for_day(position)
month_end = MONTH_ENDS[month_idx]
available = month_end - position
used = min(available, remaining)
overlaps[month_idx] += used
remaining -= used
position = (position + used) % DAYS_IN_YEAR
return overlaps
[docs]
def build_basin_region_shares(
basins_path: str,
regions_path: str,
) -> pd.DataFrame:
basins = gpd.read_file(basins_path)[["BASIN_ID", "geometry"]]
regions = gpd.read_file(regions_path)[["region", "geometry"]]
if basins.crs != regions.crs:
regions = regions.to_crs(basins.crs)
# Project to equal-area for accurate area calculation
area_crs = "EPSG:6933"
basins_eq = basins.to_crs(area_crs)
regions_eq = regions.to_crs(area_crs)
basin_area = basins_eq.set_index("BASIN_ID").geometry.area / 1e6 # km²
intersections = gpd.overlay(regions_eq, basins_eq, how="intersection")
if intersections.empty:
return pd.DataFrame(columns=["region", "basin_id", "share", "area_km2"])
intersections["area_km2"] = intersections.geometry.area / 1e6
shares = intersections.groupby(["region", "BASIN_ID"], as_index=False)[
"area_km2"
].sum()
shares = shares.rename(columns={"BASIN_ID": "basin_id"})
shares["share"] = shares.apply(
lambda row: row["area_km2"] / basin_area.at[row["basin_id"]], axis=1
)
shares = shares[shares["share"] > 1e-6]
return shares
[docs]
def compute_region_monthly_water(
shares: pd.DataFrame,
monthly_basin: pd.DataFrame,
) -> pd.DataFrame:
required = {"basin_id", "month", "blue_water_availability_m3"}
missing = required - set(monthly_basin.columns)
if missing:
raise ValueError(
"Monthly basin data missing columns: " + ", ".join(sorted(missing))
)
df = shares.merge(monthly_basin, on="basin_id", how="inner")
if df.empty:
return pd.DataFrame(columns=["region", "month", "water_available_m3"])
df["weighted"] = df["share"] * df["blue_water_availability_m3"]
region_month = (
df.groupby(["region", "month"], as_index=False)["weighted"]
.sum()
.rename(columns={"weighted": "water_available_m3"})
.sort_values(["region", "month"])
)
return region_month
[docs]
def load_crop_growing_seasons(
crop_files: Iterable[str],
) -> pd.DataFrame:
records = []
for path_str in crop_files:
path = Path(path_str)
stem = path.stem
if "_" not in stem:
continue
crop, water_supply = stem.split("_", 1)
df = pd.read_csv(path)
pivot = (
df.pivot(
index=["region", "resource_class"], columns="variable", values="value"
)
.rename_axis(columns=None)
.reset_index()
)
pivot = pivot.dropna(
subset=[
"region",
"suitable_area",
"growing_season_start_day",
"growing_season_length_days",
]
)
pivot = pivot[pivot["suitable_area"] > 0]
if pivot.empty:
continue
pivot["resource_class"] = pivot["resource_class"].astype(int)
for column in [
"suitable_area",
"growing_season_start_day",
"growing_season_length_days",
]:
pivot[column] = pd.to_numeric(pivot[column], errors="coerce")
grouped = pivot.groupby("region")
for region, group in grouped:
weight = group["suitable_area"].sum()
if weight <= 0:
continue
start = (
group["growing_season_start_day"] * group["suitable_area"]
).sum() / weight
length = (
group["growing_season_length_days"] * group["suitable_area"]
).sum() / weight
records.append(
{
"region": region,
"crop": crop,
"water_supply": water_supply,
"total_area": weight,
"growing_season_start_day": start,
"growing_season_length_days": length,
}
)
if not records:
return pd.DataFrame(
columns=[
"region",
"crop",
"water_supply",
"total_area",
"growing_season_start_day",
"growing_season_length_days",
]
)
out = pd.DataFrame(records)
return out
[docs]
def compute_region_growing_water(
region_month_water: pd.DataFrame,
crop_seasons: pd.DataFrame,
) -> pd.DataFrame:
if region_month_water.empty:
return pd.DataFrame(
columns=[
"region",
"annual_water_available_m3",
"growing_season_water_available_m3",
]
)
monthly = region_month_water.set_index(["region", "month"]) # MultiIndex
annual = (
region_month_water.groupby("region")["water_available_m3"]
.sum()
.rename("annual_water_available_m3")
)
if crop_seasons.empty:
df = annual.to_frame().reset_index()
df["growing_season_water_available_m3"] = 0.0
return df
irrigated = crop_seasons[crop_seasons["water_supply"] == "i"]
if irrigated.empty:
irrigated = crop_seasons
# Prepare container for month demand fractions per region
region_month_demand = {
region: np.zeros(12) for region in crop_seasons["region"].unique()
}
region_total_area = {region: 0.0 for region in crop_seasons["region"].unique()}
for _, row in irrigated.iterrows():
region = row["region"]
overlaps = compute_month_overlaps(
row["growing_season_start_day"], row["growing_season_length_days"]
)
if overlaps.sum() <= 0:
continue
area = row["total_area"]
region_total_area[region] = region_total_area.get(region, 0.0) + area
fraction = overlaps / MONTH_LENGTHS
region_month_demand[region] = (
region_month_demand.get(region, np.zeros(12)) + area * fraction
)
growing_records = []
for region, total_area in region_total_area.items():
demand = region_month_demand.get(region)
if demand is None or total_area <= 0:
demand_fraction = np.zeros(12)
else:
demand_fraction = np.minimum(1.0, demand / max(total_area, 1e-9))
# Get region monthly water, fill missing months with 0
try:
region_series = (
monthly.loc[region]["water_available_m3"]
.reindex(range(1, 13), fill_value=0.0)
.to_numpy(dtype=float)
)
except KeyError:
region_series = np.zeros(12)
growing_water = float(np.dot(region_series, demand_fraction))
growing_records.append(
{
"region": region,
"growing_season_water_available_m3": growing_water,
"reference_irrigated_area": total_area,
}
)
growing_df = pd.DataFrame(growing_records)
combined = (
annual.to_frame().reset_index().merge(growing_df, on="region", how="left")
)
combined["growing_season_water_available_m3"] = combined[
"growing_season_water_available_m3"
].fillna(0.0)
combined["reference_irrigated_area"] = combined["reference_irrigated_area"].fillna(
0.0
)
return combined
if __name__ == "__main__":
shapefile_path: str = snakemake.input.shapefile # type: ignore[name-defined]
regions_path: str = snakemake.input.regions # type: ignore[name-defined]
monthly_csv: str = snakemake.input.monthly # type: ignore[name-defined]
crop_files = list(snakemake.input.crop_yields) # type: ignore[name-defined]
monthly_basin_df = pd.read_csv(monthly_csv)
shares_df = build_basin_region_shares(shapefile_path, regions_path)
region_month_df = compute_region_monthly_water(shares_df, monthly_basin_df)
crop_seasons_df = load_crop_growing_seasons(crop_files)
region_growing_df = compute_region_growing_water(region_month_df, crop_seasons_df)
monthly_out = Path(snakemake.output.monthly_region) # type: ignore[name-defined]
monthly_out.parent.mkdir(parents=True, exist_ok=True)
region_month_df.to_csv(monthly_out, index=False)
growing_out = Path(snakemake.output.region_growing) # type: ignore[name-defined]
growing_out.parent.mkdir(parents=True, exist_ok=True)
region_growing_df.to_csv(growing_out, index=False)