119 lines
4.1 KiB
Python
119 lines
4.1 KiB
Python
"""
|
|
Main entry point for the Strompreis Pipeline CLI.
|
|
Coordinates data collection, transformation, and storage.
|
|
"""
|
|
|
|
import sys
|
|
import logging
|
|
import click
|
|
import polars as pl
|
|
from common.collectors import smard, weather
|
|
from common.transformators import transformator
|
|
from common.utils import database as db
|
|
from common.utils.config_loader import settings
|
|
|
|
# Structured logging configuration
|
|
logging.basicConfig(
|
|
level=logging.INFO,
|
|
format="%(asctime)s - %(name)s - %(levelname)s - %(message)s",
|
|
)
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
@click.group()
|
|
def cli():
|
|
"""Strompreis Pipeline CLI Tools."""
|
|
pass
|
|
|
|
|
|
@cli.command()
|
|
@click.option("--region", help="SMARD region code (e.g., DE-LU).")
|
|
@click.option("--lat", type=float, help="Latitude for weather data.")
|
|
@click.option("--lon", type=float, help="Longitude for weather data.")
|
|
def run(region, lat, lon):
|
|
"""
|
|
Executes the full ETL pipeline:
|
|
1. Bronze Layer: Fetch and store raw data.
|
|
2. Gold Layer: Transform and join data into a clean business view.
|
|
"""
|
|
logger.info("Initializing Pipeline...")
|
|
|
|
try:
|
|
logger.info("Step 1/4: Fetching raw data from APIs...")
|
|
smard_queries = {
|
|
"prices_raw": settings.smard.price_filter,
|
|
"load_forecast_raw": settings.smard.load_forecast_filter,
|
|
"gen_total_raw": settings.smard.generation_total_filter,
|
|
"wind_onshore_raw": settings.smard.wind_onshore_filter,
|
|
"wind_offshore_raw": settings.smard.wind_offshore_filter,
|
|
"pv_raw": settings.smard.pv_filter,
|
|
}
|
|
|
|
smard_dfs = {
|
|
tbl: smard.fetch_smard_data(fid, region)
|
|
for tbl, fid in smard_queries.items()
|
|
}
|
|
df_weather_raw = weather.fetch_weather(lat, lon)
|
|
|
|
logger.info("Step 2/4: Persisting Bronze Layer...")
|
|
with db.get_connection() as con:
|
|
db.init_tables(con)
|
|
for table, df in smard_dfs.items():
|
|
db.upsert_raw(con, table, df)
|
|
db.upsert_raw(con, "weather_raw", df_weather_raw)
|
|
|
|
logger.info("Step 3/4: Transforming and joining data (Gold Layer)...")
|
|
|
|
def to_gold_smard(df, name):
|
|
"""Internal helper to convert raw SMARD df to gold format."""
|
|
if df.is_empty():
|
|
return pl.DataFrame(
|
|
schema={"timestamp": pl.Datetime("ms", "UTC"), name: pl.Float64}
|
|
)
|
|
return df.with_columns(
|
|
pl.col("timestamp").cast(pl.Datetime("ms")).dt.replace_time_zone("UTC")
|
|
).rename({"value": name})
|
|
|
|
df_p = to_gold_smard(smard_dfs["prices_raw"], "price")
|
|
df_l = to_gold_smard(smard_dfs["load_forecast_raw"], "load_forecast")
|
|
df_g = to_gold_smard(smard_dfs["gen_total_raw"], "generation_total")
|
|
df_w_on = to_gold_smard(smard_dfs["wind_onshore_raw"], "wind_on")
|
|
df_w_off = to_gold_smard(smard_dfs["wind_offshore_raw"], "wind_off")
|
|
df_pv = to_gold_smard(smard_dfs["pv_raw"], "pv")
|
|
|
|
df_wind = df_w_on.join(
|
|
df_w_off, on="timestamp", how="full", coalesce=True
|
|
).fill_null(0)
|
|
df_wind = df_wind.with_columns(
|
|
(pl.col("wind_on") + pl.col("wind_off")).alias("wind_total")
|
|
)
|
|
|
|
df_weather_gold = transformator.transform_weather(df_weather_raw)
|
|
|
|
df_smard_all = (
|
|
df_p.join(df_l, on="timestamp", how="full", coalesce=True)
|
|
.join(df_g, on="timestamp", how="full", coalesce=True)
|
|
.join(
|
|
df_wind[["timestamp", "wind_total"]],
|
|
on="timestamp",
|
|
how="full",
|
|
coalesce=True,
|
|
)
|
|
.join(df_pv, on="timestamp", how="full", coalesce=True)
|
|
)
|
|
|
|
df_gold = df_smard_all.join(df_weather_gold, on="timestamp", how="inner")
|
|
|
|
logger.info("Step 4/4: Persisting Gold Layer...")
|
|
with db.get_connection() as con:
|
|
db.upsert_combined(con, df_gold)
|
|
|
|
logger.info("Pipeline completed successfully.")
|
|
|
|
except Exception as e:
|
|
logger.error(f"Pipeline execution failed: {e}", exc_info=True)
|
|
sys.exit(1)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
cli()
|