""" Main entry point for the Strompreis Pipeline CLI. Coordinates data collection, transformation, and storage. """ import sys import logging import click import polars as pl from common.collectors import smard, weather from common.transformators import transformator from common.utils import database as db from common.utils.config_loader import settings # Structured logging configuration logging.basicConfig( level=logging.INFO, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s", ) logger = logging.getLogger(__name__) @click.group() def cli(): """Strompreis Pipeline CLI Tools.""" pass @cli.command() @click.option("--region", help="SMARD region code (e.g., DE-LU).") @click.option("--lat", type=float, help="Latitude for weather data.") @click.option("--lon", type=float, help="Longitude for weather data.") def run(region, lat, lon): """ Executes the full ETL pipeline: 1. Bronze Layer: Fetch and store raw data. 2. Gold Layer: Transform and join data into a clean business view. """ logger.info("Initializing Pipeline...") try: logger.info("Step 1/4: Fetching raw data from APIs...") smard_queries = { "prices_raw": settings.smard.price_filter, "load_forecast_raw": settings.smard.load_forecast_filter, "gen_total_raw": settings.smard.generation_total_filter, "wind_onshore_raw": settings.smard.wind_onshore_filter, "wind_offshore_raw": settings.smard.wind_offshore_filter, "pv_raw": settings.smard.pv_filter, } smard_dfs = { tbl: smard.fetch_smard_data(fid, region) for tbl, fid in smard_queries.items() } df_weather_raw = weather.fetch_weather(lat, lon) logger.info("Step 2/4: Persisting Bronze Layer...") with db.get_connection() as con: db.init_tables(con) for table, df in smard_dfs.items(): db.upsert_raw(con, table, df) db.upsert_raw(con, "weather_raw", df_weather_raw) logger.info("Step 3/4: Transforming and joining data (Gold Layer)...") def to_gold_smard(df, name): """Internal helper to convert raw SMARD df to gold format.""" if df.is_empty(): return pl.DataFrame( schema={"timestamp": pl.Datetime("ms", "UTC"), name: pl.Float64} ) return df.with_columns( pl.col("timestamp").cast(pl.Datetime("ms")).dt.replace_time_zone("UTC") ).rename({"value": name}) df_p = to_gold_smard(smard_dfs["prices_raw"], "price") df_l = to_gold_smard(smard_dfs["load_forecast_raw"], "load_forecast") df_g = to_gold_smard(smard_dfs["gen_total_raw"], "generation_total") df_w_on = to_gold_smard(smard_dfs["wind_onshore_raw"], "wind_on") df_w_off = to_gold_smard(smard_dfs["wind_offshore_raw"], "wind_off") df_pv = to_gold_smard(smard_dfs["pv_raw"], "pv") df_wind = df_w_on.join( df_w_off, on="timestamp", how="full", coalesce=True ).fill_null(0) df_wind = df_wind.with_columns( (pl.col("wind_on") + pl.col("wind_off")).alias("wind_total") ) df_weather_gold = transformator.transform_weather(df_weather_raw) df_smard_all = ( df_p.join(df_l, on="timestamp", how="full", coalesce=True) .join(df_g, on="timestamp", how="full", coalesce=True) .join( df_wind[["timestamp", "wind_total"]], on="timestamp", how="full", coalesce=True, ) .join(df_pv, on="timestamp", how="full", coalesce=True) ) df_gold = df_smard_all.join(df_weather_gold, on="timestamp", how="inner") logger.info("Step 4/4: Persisting Gold Layer...") with db.get_connection() as con: db.upsert_combined(con, df_gold) logger.info("Pipeline completed successfully.") except Exception as e: logger.error(f"Pipeline execution failed: {e}", exc_info=True) sys.exit(1) if __name__ == "__main__": cli()