strompreis/main.py

119 lines
4.1 KiB
Python

"""
Main entry point for the Strompreis Pipeline CLI.
Coordinates data collection, transformation, and storage.
"""
import sys
import logging
import click
import polars as pl
from collectors import smard, weather
from transformators import transformator
from utils import database as db
from utils.config_loader import settings
# Structured logging configuration
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s - %(name)s - %(levelname)s - %(message)s",
)
logger = logging.getLogger(__name__)
@click.group()
def cli():
"""Strompreis Pipeline CLI Tools."""
pass
@cli.command()
@click.option("--region", help="SMARD region code (e.g., DE-LU).")
@click.option("--lat", type=float, help="Latitude for weather data.")
@click.option("--lon", type=float, help="Longitude for weather data.")
def run(region, lat, lon):
"""
Executes the full ETL pipeline:
1. Bronze Layer: Fetch and store raw data.
2. Gold Layer: Transform and join data into a clean business view.
"""
logger.info("Initializing Pipeline...")
try:
logger.info("Step 1/4: Fetching raw data from APIs...")
smard_queries = {
"prices_raw": settings.smard.price_filter,
"load_forecast_raw": settings.smard.load_forecast_filter,
"gen_total_raw": settings.smard.generation_total_filter,
"wind_onshore_raw": settings.smard.wind_onshore_filter,
"wind_offshore_raw": settings.smard.wind_offshore_filter,
"pv_raw": settings.smard.pv_filter,
}
smard_dfs = {
tbl: smard.fetch_smard_data(fid, region)
for tbl, fid in smard_queries.items()
}
df_weather_raw = weather.fetch_weather(lat, lon)
logger.info("Step 2/4: Persisting Bronze Layer...")
with db.get_connection() as con:
db.init_tables(con)
for table, df in smard_dfs.items():
db.upsert_raw(con, table, df)
db.upsert_raw(con, "weather_raw", df_weather_raw)
logger.info("Step 3/4: Transforming and joining data (Gold Layer)...")
def to_gold_smard(df, name):
"""Internal helper to convert raw SMARD df to gold format."""
if df.is_empty():
return pl.DataFrame(
schema={"timestamp": pl.Datetime("ms", "UTC"), name: pl.Float64}
)
return df.with_columns(
pl.col("timestamp").cast(pl.Datetime("ms")).dt.replace_time_zone("UTC")
).rename({"value": name})
df_p = to_gold_smard(smard_dfs["prices_raw"], "price")
df_l = to_gold_smard(smard_dfs["load_forecast_raw"], "load_forecast")
df_g = to_gold_smard(smard_dfs["gen_total_raw"], "generation_total")
df_w_on = to_gold_smard(smard_dfs["wind_onshore_raw"], "wind_on")
df_w_off = to_gold_smard(smard_dfs["wind_offshore_raw"], "wind_off")
df_pv = to_gold_smard(smard_dfs["pv_raw"], "pv")
df_wind = df_w_on.join(
df_w_off, on="timestamp", how="full", coalesce=True
).fill_null(0)
df_wind = df_wind.with_columns(
(pl.col("wind_on") + pl.col("wind_off")).alias("wind_total")
)
df_weather_gold = transformator.transform_weather(df_weather_raw)
df_smard_all = (
df_p.join(df_l, on="timestamp", how="full", coalesce=True)
.join(df_g, on="timestamp", how="full", coalesce=True)
.join(
df_wind[["timestamp", "wind_total"]],
on="timestamp",
how="full",
coalesce=True,
)
.join(df_pv, on="timestamp", how="full", coalesce=True)
)
df_gold = df_smard_all.join(df_weather_gold, on="timestamp", how="inner")
logger.info("Step 4/4: Persisting Gold Layer...")
with db.get_connection() as con:
db.upsert_combined(con, df_gold)
logger.info("Pipeline completed successfully.")
except Exception as e:
logger.error(f"Pipeline execution failed: {e}", exc_info=True)
sys.exit(1)
if __name__ == "__main__":
cli()