""" DuckDB database interface for Bronze (Raw) and Gold (Combined) layers. """ import duckdb import polars as pl from contextlib import contextmanager from utils.config_loader import settings @contextmanager def get_connection(db_path: str = None): """Context manager for DuckDB connections.""" db_path = db_path or settings.database.path con = duckdb.connect(db_path) try: yield con finally: con.close() def init_tables(con: duckdb.DuckDBPyConnection): """ Initializes the database schema following the Medallion architecture. """ # BRONZE LAYER (Raw API responses) smard_tables = [ "prices_raw", "load_forecast_raw", "gen_total_raw", "wind_onshore_raw", "wind_offshore_raw", "pv_raw", ] for table in smard_tables: con.execute( f"CREATE TABLE IF NOT EXISTS {table} (timestamp BIGINT PRIMARY KEY, value DOUBLE)" ) con.execute(""" CREATE TABLE IF NOT EXISTS weather_raw ( timestamp VARCHAR PRIMARY KEY, temperature DOUBLE, wind_speed DOUBLE, solar DOUBLE, sunshine DOUBLE, cloud_cover DOUBLE, precipitation DOUBLE ) """) # GOLD LAYER (Transformed and Joined Business Data) con.execute(""" CREATE TABLE IF NOT EXISTS combined ( timestamp TIMESTAMP PRIMARY KEY, price DOUBLE, load_forecast DOUBLE, generation_total DOUBLE, wind_total DOUBLE, pv DOUBLE, temperature DOUBLE, wind_speed DOUBLE, solar DOUBLE, sunshine DOUBLE, cloud_cover DOUBLE, precipitation DOUBLE ) """) def upsert_raw(con: duckdb.DuckDBPyConnection, table_name: str, df: pl.DataFrame): """Inserts raw data using explicit columns to match the target table schema.""" if df.is_empty(): return cols = con.execute(f"DESCRIBE {table_name}").pl()["column_name"].to_list() df_to_insert = df.select([c for c in cols if c in df.columns]) con.execute( f"INSERT INTO {table_name} SELECT * FROM df_to_insert ON CONFLICT (timestamp) DO NOTHING;" ) def upsert_combined(con: duckdb.DuckDBPyConnection, df: pl.DataFrame): """Inserts business-ready data into the Gold layer.""" if df.is_empty(): return con.execute( "INSERT INTO combined SELECT * FROM df ON CONFLICT (timestamp) DO NOTHING;" )