Merge branch 'dev/perform-refactoring'

* dev/perform-refactoring:
  chore: update tests to adaped to new structure
  refactor: modernize project and improve error handling and documentation
This commit is contained in:
Patryk Hegenberg 2026-02-11 15:49:07 +01:00
commit 12751aa7b8
22 changed files with 1921 additions and 188 deletions

17
Dockerfile Normal file
View file

@ -0,0 +1,17 @@
FROM python:3.11-slim-bookworm
COPY --from=ghcr.io/astral-sh/uv:latest /uv /uvbin/uv
WORKDIR /app
COPY pyproject.toml uv.lock ./
RUN /uvbin/uv pip install --system --requirement pyproject.toml
COPY . .
RUN mkdir -p output config .cache && chmod -R 777 output config .cache
ENV PYTHONPATH=.
ENV INTERVAL=3600
ENV UV_CACHE_DIR=/app/.cache
CMD ["/uvbin/uv", "run", "python", "main.py", "run"]

64
README.md Normal file
View file

@ -0,0 +1,64 @@
# Strompreis & Netz Pipeline
Automatisierte End-to-End Daten-Pipeline zur Erfassung, Analyse und Visualisierung von Strommarkt- und Wetterdaten.
## 🏗 Architektur (Medallion Prinzip)
Das Projekt folgt einer sauberen Data-Engineering-Struktur unter Verwendung von DuckDB und Polars:
1. **Bronze Layer (Raw):** Unveränderte API-Antworten von SMARD und Bright Sky werden als historische Fakten gespeichert.
2. **Gold Layer (Combined):** Transformierte, bereinigte und über Zeitstempel zusammengeführte Daten (Inner Join), die direkt für Analysen und das Dashboard bereitstehen.
## 🚀 Features
- **Datensammlung:** Automatisiertes Abrufen von:
- Strompreisen, Netzlast, Gesamterzeugung, Wind- & Solar-Erzeugung (SMARD API).
- Umfassenden Wetterdaten (Temperatur, Wind, Solarstrahlung, etc. via Bright Sky API).
- **Dashboard:** Interaktive Visualisierung mit Streamlit (Preise, Energiemix, Wetter-Details).
- **REST-API:** FastAPI-Schnittstelle für den programmatischen Zugriff auf aktuelle Daten.
- **Orchestrierung:** Vollständige Docker-Integration inklusive eines automatisierten Schedulers.
## 🛠 Installation & Betrieb
### Mit Docker (Empfohlen)
Das gesamte System (Pipeline, API, Dashboard) wird mit einem Befehl gestartet:
```bash
docker-compose up --build
```
- **Dashboard:** `http://localhost:8501`
- **API:** `http://localhost:8000/docs`
### Lokal (Entwicklung)
1. **Abhängigkeiten installieren:**
```bash
uv sync
```
2. **Pipeline manuell ausführen:**
```bash
uv run python main.py run
```
3. **Dashboard starten:**
```bash
uv run streamlit run dashboard/app.py
```
## ⚙️ Konfiguration
Die Konfiguration erfolgt über `config/config.yaml` oder Umgebungsvariablen mit dem Präfix `STROM_`:
- `STROM_SMARD__REGION`: Region-Code (Standard: DE-LU)
- `STROM_DATABASE__PATH`: Pfad zur DuckDB Datei
- `INTERVAL`: Ausführungsintervall des Schedulers im Docker-Container (in Sekunden).
## 🧪 Tests
```bash
uv run pytest
```
## Lizenz
MIT

53
api/main.py Normal file
View file

@ -0,0 +1,53 @@
"""
REST API for accessing processed electricity price and network data.
"""
import sys
from pathlib import Path
from fastapi import FastAPI, HTTPException
# Add project root to sys.path
project_root = str(Path(__file__).parent.parent)
if project_root not in sys.path:
sys.path.append(project_root)
from utils import database as db
from utils.config_loader import settings
app = FastAPI(
title="Strompreis API",
description="Provides access to cleaned and joined electricity price, network, and weather data."
)
@app.get("/current-price")
async def get_current_price():
"""
Returns the most recent price record from the Gold (combined) layer.
"""
try:
with db.get_connection() as con:
res = con.execute(
"SELECT timestamp, price FROM combined ORDER BY timestamp DESC LIMIT 1"
).fetchone()
if not res:
raise HTTPException(status_code=404, detail="No data available")
return {
"timestamp": res[0],
"price_eur_mwh": res[1],
"price_ct_kwh": round(res[1] / 10, 2)
}
except HTTPException:
raise
except Exception as e:
raise HTTPException(status_code=500, detail=f"Database error: {str(e)}")
@app.get("/status")
async def get_status():
"""Returns basic system configuration and status."""
return {
"region": settings.smard.region,
"database": settings.database.path,
"status": "operational"
}

View file

@ -1,29 +0,0 @@
from datetime import datetime
import time
import polars as pl
from utils import request_utils
def fetch_prices(region: str = "DE-LU", filter_id: int = 4169) -> pl.DataFrame:
current_time = 0
closest = 0
now_ms = int(time.time()) * 1000
index_url = (
f"https://www.smard.de/app/chart_data/{filter_id}/{region}/index_hour.json"
)
timestamps = request_utils.make_requests(index_url, timeout=20)
closest = min(timestamps.get("timestamps", []), key=lambda x: abs(x - now_ms))
current_time = closest
url = f"https://www.smard.de/app/chart_data/{filter_id}/{region}/{filter_id}_{region}_hour_{current_time}.json"
data = request_utils.make_requests(url, timeout=20)
series = data.get("series", [])
clean_series = [row for row in series if row[1] is not None]
df = pl.DataFrame(
{
"timestamp": [row[0] for row in clean_series],
"price": [row[1] for row in clean_series],
}
)
return df

58
collectors/smard.py Normal file
View file

@ -0,0 +1,58 @@
"""
Collector for SMARD (Electricity Market Data) API.
"""
import time
import logging
import polars as pl
from utils import request_utils
from utils.config_loader import settings
logger = logging.getLogger(__name__)
def fetch_smard_data(filter_id: int, region: str = None) -> pl.DataFrame:
"""
Fetches time-series data from the SMARD API for a specific filter.
Args:
filter_id: The SMARD data identifier (e.g., 4169 for price).
region: The region code (defaults to config value).
Returns:
A Polars DataFrame with 'timestamp' (ms) and 'value'.
"""
region = region or settings.smard.region
base_url = settings.smard.base_url
now_ms = int(time.time()) * 1000
index_url = f"{base_url}/{filter_id}/{region}/index_hour.json"
try:
index_data = request_utils.make_requests(index_url)
if not index_data or "timestamps" not in index_data:
logger.warning(f"No index data for filter {filter_id}")
return pl.DataFrame()
closest_ts = min(index_data["timestamps"], key=lambda x: abs(x - now_ms))
data_url = (
f"{base_url}/{filter_id}/{region}/"
f"{filter_id}_{region}_hour_{closest_ts}.json"
)
payload = request_utils.make_requests(data_url)
series = payload.get("series", [])
clean_series = [row for row in series if row[1] is not None]
if not clean_series:
return pl.DataFrame()
return pl.DataFrame(
{
"timestamp": [row[0] for row in clean_series],
"value": [row[1] for row in clean_series],
},
schema={"timestamp": pl.Int64, "value": pl.Float64}
)
except Exception as e:
logger.error(f"Failed to fetch SMARD filter {filter_id}: {e}")
return pl.DataFrame()

View file

@ -1,11 +1,30 @@
"""
Collector for Bright Sky (Weather) API.
"""
import logging
from datetime import datetime, timedelta, timezone
from utils import request_utils
import polars as pl
from utils import request_utils
from utils.config_loader import settings
logger = logging.getLogger(__name__)
def fetch_weather(lat: float = None, lon: float = None) -> pl.DataFrame:
"""
Fetches historical weather data from Bright Sky API.
Filters for the specific columns defined in the Bronze schema.
Returns:
A Polars DataFrame containing filtered raw weather parameters.
"""
lat = lat or settings.brightsky.lat
lon = lon or settings.brightsky.lon
url = settings.brightsky.base_url
def fetch_weather(lat: float = 52.52, lon: float = 13.41) -> pl.DataFrame:
url = "https://api.brightsky.dev/weather"
headers = {"Accept": "application/json"}
current_utc = datetime.now(timezone.utc)
payload = {
"date": (current_utc - timedelta(hours=72)).isoformat(),
@ -15,8 +34,9 @@ def fetch_weather(lat: float = 52.52, lon: float = 13.41) -> pl.DataFrame:
"units": "dwd",
"tz": "Etc/UTC",
}
data = request_utils.make_requests(url, headers, payload, 20)
relevant_fields = [
schema_cols = [
"timestamp",
"temperature",
"wind_speed",
"solar",
@ -24,13 +44,20 @@ def fetch_weather(lat: float = 52.52, lon: float = 13.41) -> pl.DataFrame:
"cloud_cover",
"precipitation",
]
critical_fields = ["temperature", "wind_speed"]
weather_data = [
{k: v for k, v in entry.items() if k in relevant_fields or k in ["timestamp"]}
for entry in data["weather"]
if all(entry.get(field) is not None for field in critical_fields)
]
try:
data = request_utils.make_requests(url, params=payload)
weather_list = data.get("weather", [])
df_weather = pl.DataFrame(weather_data)
return df_weather
if not weather_list:
logger.warning("No weather data returned from API.")
return pl.DataFrame()
df = pl.DataFrame(weather_list)
available_cols = [c for c in schema_cols if c in df.columns]
return df.select(available_cols)
except Exception as e:
logger.error(f"Failed to fetch weather data: {e}")
return pl.DataFrame()

View file

@ -0,0 +1,12 @@
smard:
region: "DE-LU"
filter_id: 4169
base_url: "https://www.smard.de/app/chart_data"
brightsky:
lat: 52.52
lon: 13.41
base_url: "https://api.brightsky.dev/weather"
database:
path: "output/pipeline.duckdb"

View file

@ -0,0 +1,73 @@
import sys
from pathlib import Path
import duckdb
import polars as pl
import streamlit as st
project_root = str(Path(__file__).parent.parent)
if project_root not in sys.path:
sys.path.append(project_root)
from utils.config_loader import settings
st.set_page_config(page_title="Strompreis & Netz Dashboard", layout="wide")
st.title("⚡ Strompreis, Netz & Wetter Dashboard")
@st.cache_data(ttl=300)
def load_data():
try:
con = duckdb.connect(settings.database.path)
df = con.execute("SELECT * FROM combined ORDER BY timestamp DESC").pl()
con.close()
return df.fill_null(0)
except Exception as e:
st.error(f"Fehler beim Laden der Daten: {e}")
return pl.DataFrame()
df = load_data()
if df.is_empty():
st.warning("Keine Daten gefunden. Bitte Pipeline starten.")
else:
latest = df.head(1).to_dicts()[0]
m1, m2, m3, m4 = st.columns(4)
m1.metric("Preis", f"{latest['price']:.2f} €/MWh")
m2.metric("Netzlast", f"{latest['load_forecast'] / 1000:.1f} GW")
m3.metric("Erneuerbare", f"{(latest['wind_total'] + latest['pv']) / 1000:.1f} GW")
m4.metric("Außentemp.", f"{latest['temperature']:.1f} °C")
tab1, tab2, tab3 = st.tabs(["Markt & Netz", "Wetter-Details", "Rohdaten"])
with tab1:
st.subheader("Strompreis vs. Netzlast")
pdf = df.to_pandas()
st.line_chart(pdf.set_index("timestamp")[["price", "load_forecast"]])
st.subheader("Erzeugung Mix (Wind & Solar)")
st.area_chart(pdf.set_index("timestamp")[["wind_total", "pv"]])
with tab2:
st.subheader("Detaillierte Wetterdaten")
w_col1, w_col2 = st.columns(2)
with w_col1:
st.info("Temperaturverlauf")
st.line_chart(pdf.set_index("timestamp")["temperature"])
with w_col2:
st.info("Windgeschwindigkeit")
st.line_chart(pdf.set_index("timestamp")["wind_speed"])
st.info("Weitere Wetterfaktoren")
available_weather = [
c
for c in ["solar", "sunshine", "cloud_cover", "precipitation"]
if c in df.columns
]
if available_weather:
st.line_chart(pdf.set_index("timestamp")[available_weather])
with tab3:
st.dataframe(df)

36
docker-compose.yaml Normal file
View file

@ -0,0 +1,36 @@
services:
pipeline:
build: .
user: "1000:1000"
volumes:
- ./output:/app/output:z
- ./config:/app/config:z
command: ["/bin/bash", "scripts/scheduler.sh"]
environment:
- INTERVAL=3600
restart: unless-stopped
api:
build: .
user: "1000:1000"
ports:
- "8000:8000"
volumes:
- ./output:/app/output:z
- ./config:/app/config:z
command: ["python", "-m", "uvicorn", "api.main:app", "--host", "0.0.0.0", "--port", "8000"]
restart: unless-stopped
dashboard:
build: .
user: "1000:1000"
ports:
- "8501:8501"
volumes:
- ./output:/app/output:z
- ./config:/app/config:z
environment:
- STREAMLIT_BROWSER_GATHER_USAGE_STATS=false
- STREAMLIT_USAGE_STATS_ENABLED=false
command: ["streamlit", "run", "dashboard/app.py", "--server.port", "8501", "--server.address", "0.0.0.0"]
restart: unless-stopped

129
main.py
View file

@ -1,10 +1,18 @@
"""
Main entry point for the Strompreis Pipeline CLI.
Coordinates data collection, transformation, and storage.
"""
import sys
from collectors import pricing, weather
import logging
import click
import polars as pl
from collectors import smard, weather
from transformators import transformator
from utils import database as db
import logging
from utils.config_loader import settings
# Structured logging configuration
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s - %(name)s - %(levelname)s - %(message)s",
@ -12,39 +20,100 @@ logging.basicConfig(
logger = logging.getLogger(__name__)
def main():
logger.info("Starte Strompreis-Pipeline")
@click.group()
def cli():
"""Strompreis Pipeline CLI Tools."""
pass
logger.info("Hole Preisdaten...")
df_prices_raw = pricing.fetch_prices()
logger.info(f"Preise: {len(df_prices_raw)} Zeilen")
logger.info("Hole Wetterdaten...")
df_weather_raw = weather.fetch_weather()
logger.info(f"Wetter: {len(df_weather_raw)} Zeilen")
@cli.command()
@click.option("--region", help="SMARD region code (e.g., DE-LU).")
@click.option("--lat", type=float, help="Latitude for weather data.")
@click.option("--lon", type=float, help="Longitude for weather data.")
def run(region, lat, lon):
"""
Executes the full ETL pipeline:
1. Bronze Layer: Fetch and store raw data.
2. Gold Layer: Transform and join data into a clean business view.
"""
logger.info("Initializing Pipeline...")
logger.info("Transformiere Daten...")
df_prices_clean = transformator.transform_prices(df_prices_raw)
df_weather_clean = transformator.transform_weather(df_weather_raw)
df_joined = transformator.join_dataframes(df_prices_clean, df_weather_clean)
logger.info(f"Joined: {len(df_joined)} Zeilen")
try:
logger.info("Step 1/4: Fetching raw data from APIs...")
smard_queries = {
"prices_raw": settings.smard.price_filter,
"load_forecast_raw": settings.smard.load_forecast_filter,
"gen_total_raw": settings.smard.generation_total_filter,
"wind_onshore_raw": settings.smard.wind_onshore_filter,
"wind_offshore_raw": settings.smard.wind_offshore_filter,
"pv_raw": settings.smard.pv_filter,
}
logger.info("Speichere in DuckDB...")
smard_dfs = {
tbl: smard.fetch_smard_data(fid, region)
for tbl, fid in smard_queries.items()
}
df_weather_raw = weather.fetch_weather(lat, lon)
logger.info("Step 2/4: Persisting Bronze Layer...")
with db.get_connection() as con:
db.init_tables(con)
db.upsert_prices(con, df_prices_raw)
db.upsert_prices_clean(con, df_prices_clean)
db.upsert_weather(con, df_weather_raw)
db.upsert_combined(con, df_joined)
logger.info("Erfolgreich zur Datenbank hinzugefügt.")
for table, df in smard_dfs.items():
db.upsert_raw(con, table, df)
db.upsert_raw(con, "weather_raw", df_weather_raw)
logger.info("Step 3/4: Transforming and joining data (Gold Layer)...")
def to_gold_smard(df, name):
"""Internal helper to convert raw SMARD df to gold format."""
if df.is_empty():
return pl.DataFrame(
schema={"timestamp": pl.Datetime("ms", "UTC"), name: pl.Float64}
)
return df.with_columns(
pl.col("timestamp").cast(pl.Datetime("ms")).dt.replace_time_zone("UTC")
).rename({"value": name})
df_p = to_gold_smard(smard_dfs["prices_raw"], "price")
df_l = to_gold_smard(smard_dfs["load_forecast_raw"], "load_forecast")
df_g = to_gold_smard(smard_dfs["gen_total_raw"], "generation_total")
df_w_on = to_gold_smard(smard_dfs["wind_onshore_raw"], "wind_on")
df_w_off = to_gold_smard(smard_dfs["wind_offshore_raw"], "wind_off")
df_pv = to_gold_smard(smard_dfs["pv_raw"], "pv")
df_wind = df_w_on.join(
df_w_off, on="timestamp", how="full", coalesce=True
).fill_null(0)
df_wind = df_wind.with_columns(
(pl.col("wind_on") + pl.col("wind_off")).alias("wind_total")
)
df_weather_gold = transformator.transform_weather(df_weather_raw)
df_smard_all = (
df_p.join(df_l, on="timestamp", how="full", coalesce=True)
.join(df_g, on="timestamp", how="full", coalesce=True)
.join(
df_wind[["timestamp", "wind_total"]],
on="timestamp",
how="full",
coalesce=True,
)
.join(df_pv, on="timestamp", how="full", coalesce=True)
)
df_gold = df_smard_all.join(df_weather_gold, on="timestamp", how="inner")
logger.info("Step 4/4: Persisting Gold Layer...")
with db.get_connection() as con:
db.upsert_combined(con, df_gold)
logger.info("Pipeline completed successfully.")
except Exception as e:
logger.error(f"Pipeline execution failed: {e}", exc_info=True)
sys.exit(1)
if __name__ == "__main__":
try:
main()
except KeyboardInterrupt:
logger.info("Pipeline durch Benutzer gestoppt")
sys.exit(0)
except Exception as e:
logger.error(f"Pipeline fehlgeschlagen: {e}", exc_info=True)
sys.exit(1)
cli()

View file

@ -5,8 +5,37 @@ description = "Add your description here"
readme = "README.md"
requires-python = ">=3.11"
dependencies = [
"click>=8.3.1",
"duckdb>=1.4.4",
"fastapi>=0.128.7",
"polars>=1.38.1",
"pyarrow>=23.0.0",
"pydantic>=2.12.5",
"pydantic-settings>=2.12.0",
"pyyaml>=6.0.3",
"requests>=2.32.5",
"streamlit>=1.54.0",
"tenacity>=9.1.4",
"uvicorn>=0.40.0",
]
[tool.ruff]
line-length = 88
target-version = "py311"
[tool.ruff.lint]
select = ["E", "F", "I", "W", "C90", "B"]
ignore = []
[tool.pytest.ini_options]
pythonpath = ["."]
testpaths = ["tests"]
[dependency-groups]
dev = [
"httpx>=0.28.1",
"pytest>=9.0.2",
"pytest-mock>=3.15.1",
"requests-mock>=1.12.1",
"ruff>=0.15.0",
]

8
scripts/scheduler.sh Normal file
View file

@ -0,0 +1,8 @@
#!/bin/bash
echo "Starte Pipeline Scheduler (Intervall: $INTERVAL Sekunden)"
while true; do
echo "Führe Pipeline aus: $(date)"
python main.py run
echo "Pipeline beendet. Warte $INTERVAL Sekunden..."
sleep ${INTERVAL:-3600}
done

20
tests/test_api.py Normal file
View file

@ -0,0 +1,20 @@
from fastapi.testclient import TestClient
from api.main import app
import pytest
client = TestClient(app)
def test_get_status():
response = client.get("/status")
assert response.status_code == 200
assert response.json()["status"] == "operational"
def test_get_current_price_no_data(mocker):
# Mock database connection to return no data
mock_db = mocker.patch("utils.database.get_connection")
mock_con = mock_db.return_value.__enter__.return_value
mock_con.execute.return_value.fetchone.return_value = None
response = client.get("/current-price")
assert response.status_code == 404
assert response.json()["detail"] == "No data available"

View file

20
tests/test_smard.py Normal file
View file

@ -0,0 +1,20 @@
import polars as pl
import requests_mock
from collectors.smard import fetch_smard_data
def test_fetch_smard_data():
with requests_mock.Mocker() as m:
# Mock index call
m.get("https://www.smard.de/app/chart_data/4169/DE-LU/index_hour.json",
json={"timestamps": [1700000000000]})
# Mock data call
m.get("https://www.smard.de/app/chart_data/4169/DE-LU/4169_DE-LU_hour_1700000000000.json",
json={
"series": [[1700000000000, 50.0], [1700003600000, 60.0]]
})
df = fetch_smard_data(filter_id=4169)
assert isinstance(df, pl.DataFrame)
assert df.shape == (2, 2)
assert "value" in df.columns
assert df["value"][0] == 50.0

View file

@ -0,0 +1,28 @@
import polars as pl
from transformators.transformator import (
transform_prices,
transform_weather,
)
def test_transform_prices():
df = pl.DataFrame({
"timestamp": [1700000000000],
"value": [50.0]
})
transformed = transform_prices(df)
assert transformed.shape == (1, 2)
assert transformed["timestamp"].dtype == pl.Datetime("ms", time_zone="UTC")
def test_transform_weather():
df = pl.DataFrame({
"timestamp": ["2023-11-14T22:13:20+00:00"],
"temperature": [10.5],
"wind_speed": [5.0],
"solar": [0.0],
"sunshine": [0.0],
"cloud_cover": [0.0],
"precipitation": [0.0]
})
transformed = transform_weather(df)
assert transformed.shape == (1, 7)
assert transformed["timestamp"].dtype == pl.Datetime("ms", time_zone="UTC")

View file

@ -0,0 +1,27 @@
import polars as pl
import requests_mock
from collectors.weather import fetch_weather
def test_fetch_weather():
with requests_mock.Mocker() as m:
m.get("https://api.brightsky.dev/weather", json={
"weather": [
{
"timestamp": "2023-11-14T22:13:20+00:00",
"temperature": 10.5,
"wind_speed": 5.0,
"solar": 0.0,
"sunshine": 0.0,
"cloud_cover": 0.0,
"precipitation": 0.0,
"extra_field": "should_be_filtered"
}
]
})
df = fetch_weather()
assert isinstance(df, pl.DataFrame)
# Check that it filtered to the schema columns (7 columns)
assert df.shape == (1, 7)
assert "temperature" in df.columns
assert "extra_field" not in df.columns

View file

@ -1,76 +1,40 @@
"""
Data transformation logic for price and weather data.
"""
from datetime import timedelta
import polars as pl
def transform_weather(df: pl.DataFrame) -> pl.DataFrame:
"""
Cleans and standardizes raw weather data.
"""
if df.is_empty():
return pl.DataFrame()
relevant_cols = [
"timestamp", "temperature", "wind_speed", "solar",
"sunshine", "cloud_cover", "precipitation"
]
# Filter for existing relevant columns
cols_to_keep = [c for c in relevant_cols if c in df.columns]
return df.select(cols_to_keep).with_columns(
pl.col("timestamp")
.str.to_datetime(format="%Y-%m-%dT%H:%M:%S%z", time_zone="UTC")
.cast(pl.Datetime("ms", time_zone="UTC"))
).sort("timestamp")
def transform_prices(df: pl.DataFrame) -> pl.DataFrame:
"""
Transforms raw SMARD timestamps to UTC Datetime.
"""
if df.is_empty():
return pl.DataFrame()
return df.with_columns(
[
pl.col("timestamp")
.cast(pl.Datetime("ms"))
.dt.replace_time_zone("UTC")
.alias("timestamp")
]
)
def transform_weather(df: pl.DataFrame) -> pl.DataFrame:
return df.with_columns(
[
pl.col("timestamp")
.str.to_datetime(
format="%Y-%m-%dT%H:%M:%S%z",
time_zone="UTC",
)
.cast(pl.Datetime("ms", time_zone="UTC"))
.alias("timestamp")
]
)
def join_dataframes(df_prices: pl.DataFrame, df_weather: pl.DataFrame) -> pl.DataFrame:
return df_prices.join(df_weather, on="timestamp", how="inner").select(
[
pl.col("timestamp"),
pl.col("price"),
pl.col(
[
"temperature",
"wind_speed",
"solar",
"sunshine",
"cloud_cover",
"precipitation",
]
),
]
)
#
#
# def join_dataframes(df_prices: pl.DataFrame, df_weather: pl.DataFrame) -> pl.DataFrame:
# return (
# df_prices.sort("timestamp") # ← Pflicht!
# .join_asof(
# df_weather.sort("timestamp"), # ← Pflicht!
# on="timestamp",
# strategy="nearest", # ← nearest statt backward!
# tolerance=timedelta(hours=1),
# )
# .select(
# [
# pl.col("timestamp"),
# pl.col("price"),
# pl.col(
# [
# "temperature",
# "wind_speed",
# "solar",
# "sunshine",
# "cloud_cover",
# "precipitation",
# ]
# ),
# ]
# )
# )
).sort("timestamp")

View file

@ -0,0 +1,54 @@
from pathlib import Path
import yaml
from pydantic import BaseModel
from pydantic_settings import BaseSettings, SettingsConfigDict
class SmardConfig(BaseModel):
region: str = "DE-LU"
price_filter: int = 4169
load_forecast_filter: int = 4382
generation_total_filter: int = 122
wind_onshore_filter: int = 4069
wind_offshore_filter: int = 4068
pv_filter: int = 4070
base_url: str = "https://www.smard.de/app/chart_data"
class BrightSkyConfig(BaseModel):
lat: float = 52.52
lon: float = 13.41
base_url: str = "https://api.brightsky.dev/weather"
class DatabaseConfig(BaseModel):
path: str = "output/pipeline.duckdb"
class Settings(BaseSettings):
model_config = SettingsConfigDict(
env_file=".env",
env_nested_delimiter="__",
env_prefix="STROM_",
extra="ignore"
)
smard: SmardConfig = SmardConfig()
brightsky: BrightSkyConfig = BrightSkyConfig()
database: DatabaseConfig = DatabaseConfig()
def load_config(config_path: str = "config/config.yaml") -> Settings:
path = Path(config_path)
if not path.exists():
return Settings()
with open(path, "r") as f:
config_data = yaml.safe_load(f) or {}
return Settings(**config_data)
# Global settings instance
settings = load_config()

View file

@ -1,10 +1,17 @@
"""
DuckDB database interface for Bronze (Raw) and Gold (Combined) layers.
"""
import duckdb
import polars as pl
from contextlib import contextmanager
from utils.config_loader import settings
@contextmanager
def get_connection(db_path: str = "output/pipeline.duckdb"):
def get_connection(db_path: str = None):
"""Context manager for DuckDB connections."""
db_path = db_path or settings.database.path
con = duckdb.connect(db_path)
try:
yield con
@ -13,23 +20,26 @@ def get_connection(db_path: str = "output/pipeline.duckdb"):
def init_tables(con: duckdb.DuckDBPyConnection):
con.execute("""
CREATE TABLE IF NOT EXISTS prices_raw (
timestamp BIGINT NOT NULL UNIQUE,
price DOUBLE NOT NULL
"""
Initializes the database schema following the Medallion architecture.
"""
# BRONZE LAYER (Raw API responses)
smard_tables = [
"prices_raw",
"load_forecast_raw",
"gen_total_raw",
"wind_onshore_raw",
"wind_offshore_raw",
"pv_raw",
]
for table in smard_tables:
con.execute(
f"CREATE TABLE IF NOT EXISTS {table} (timestamp BIGINT PRIMARY KEY, value DOUBLE)"
)
""")
con.execute("""
CREATE TABLE IF NOT EXISTS prices_clean (
timestamp TIMESTAMP NOT NULL UNIQUE,
price DOUBLE NOT NULL
)
""")
con.execute("""
CREATE TABLE IF NOT EXISTS weather_raw (
timestamp TIMESTAMP NOT NULL UNIQUE,
timestamp VARCHAR PRIMARY KEY,
temperature DOUBLE,
wind_speed DOUBLE,
solar DOUBLE,
@ -39,10 +49,15 @@ def init_tables(con: duckdb.DuckDBPyConnection):
)
""")
# GOLD LAYER (Transformed and Joined Business Data)
con.execute("""
CREATE TABLE IF NOT EXISTS combined (
timestamp TIMESTAMP NOT NULL UNIQUE,
price DOUBLE NOT NULL,
timestamp TIMESTAMP PRIMARY KEY,
price DOUBLE,
load_forecast DOUBLE,
generation_total DOUBLE,
wind_total DOUBLE,
pv DOUBLE,
temperature DOUBLE,
wind_speed DOUBLE,
solar DOUBLE,
@ -53,25 +68,23 @@ def init_tables(con: duckdb.DuckDBPyConnection):
""")
def upsert_prices(con: duckdb.DuckDBPyConnection, df: pl.DataFrame):
def upsert_raw(con: duckdb.DuckDBPyConnection, table_name: str, df: pl.DataFrame):
"""Inserts raw data using explicit columns to match the target table schema."""
if df.is_empty():
return
cols = con.execute(f"DESCRIBE {table_name}").pl()["column_name"].to_list()
df_to_insert = df.select([c for c in cols if c in df.columns])
con.execute(
"""INSERT INTO prices_raw SELECT * FROM df ON CONFLICT (timestamp) DO NOTHING;"""
)
def upsert_prices_clean(con: duckdb.DuckDBPyConnection, df: pl.DataFrame):
con.execute(
"""INSERT INTO prices_clean SELECT * FROM df ON CONFLICT (timestamp) DO NOTHING;"""
)
def upsert_weather(con: duckdb.DuckDBPyConnection, df: pl.DataFrame):
con.execute(
"""INSERT INTO weather_raw SELECT * FROM df ON CONFLICT (timestamp) DO NOTHING;"""
f"INSERT INTO {table_name} SELECT * FROM df_to_insert ON CONFLICT (timestamp) DO NOTHING;"
)
def upsert_combined(con: duckdb.DuckDBPyConnection, df: pl.DataFrame):
"""Inserts business-ready data into the Gold layer."""
if df.is_empty():
return
con.execute(
"""INSERT INTO combined SELECT * FROM df ON CONFLICT (timestamp) DO NOTHING;"""
"INSERT INTO combined SELECT * FROM df ON CONFLICT (timestamp) DO NOTHING;"
)

View file

@ -1,23 +1,41 @@
import logging
import requests
from tenacity import (
retry,
retry_if_exception_type,
stop_after_attempt,
wait_exponential,
)
logger = logging.getLogger(__name__)
@retry(
stop=stop_after_attempt(3),
wait=wait_exponential(multiplier=1, min=2, max=10),
retry=retry_if_exception_type((requests.exceptions.RequestException,)),
reraise=True,
)
def make_requests(
url: str, headers: dict = {}, payload: dict = {}, timeout: int = 20
url: str, headers: dict = None, params: dict = None, timeout: int = 20
) -> dict:
headers = headers or {}
params = params or {}
try:
res = requests.get(url, headers=headers, params=payload, timeout=timeout)
logger.debug(f"Requesting URL: {url} with params: {params}")
res = requests.get(url, headers=headers, params=params, timeout=timeout)
res.raise_for_status()
return res.json()
except requests.exceptions.HTTPError as errh:
print("HTTP Error")
print(errh.args[0])
raise errh
logger.error(f"HTTP Error: {errh}")
raise
except requests.ConnectionError as errc:
print("http connection Error")
raise errc
logger.error(f"Connection Error: {errc}")
raise
except requests.exceptions.Timeout as errt:
print("http connection timeout")
raise errt
logger.error(f"Timeout Error: {errt}")
raise
except Exception as e:
print("unknown exception")
raise e
logger.error(f"Unknown Exception: {e}")
raise

1172
uv.lock generated

File diff suppressed because it is too large Load diff