50 lines
1.5 KiB
Python
50 lines
1.5 KiB
Python
import sys
|
|
from collectors import pricing, weather
|
|
from transformators import transformator
|
|
from utils import database as db
|
|
import logging
|
|
|
|
|
|
logging.basicConfig(
|
|
level=logging.INFO,
|
|
format="%(asctime)s - %(name)s - %(levelname)s - %(message)s",
|
|
)
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
def main():
|
|
logger.info("Starte Strompreis-Pipeline")
|
|
|
|
logger.info("Hole Preisdaten...")
|
|
df_prices_raw = pricing.fetch_prices()
|
|
logger.info(f"Preise: {len(df_prices_raw)} Zeilen")
|
|
|
|
logger.info("Hole Wetterdaten...")
|
|
df_weather_raw = weather.fetch_weather()
|
|
logger.info(f"Wetter: {len(df_weather_raw)} Zeilen")
|
|
|
|
logger.info("Transformiere Daten...")
|
|
df_prices_clean = transformator.transform_prices(df_prices_raw)
|
|
df_weather_clean = transformator.transform_weather(df_weather_raw)
|
|
df_joined = transformator.join_dataframes(df_prices_clean, df_weather_clean)
|
|
logger.info(f"Joined: {len(df_joined)} Zeilen")
|
|
|
|
logger.info("Speichere in DuckDB...")
|
|
with db.get_connection() as con:
|
|
db.init_tables(con)
|
|
db.upsert_prices(con, df_prices_raw)
|
|
db.upsert_prices_clean(con, df_prices_clean)
|
|
db.upsert_weather(con, df_weather_raw)
|
|
db.upsert_combined(con, df_joined)
|
|
logger.info("Erfolgreich zur Datenbank hinzugefügt.")
|
|
|
|
|
|
if __name__ == "__main__":
|
|
try:
|
|
main()
|
|
except KeyboardInterrupt:
|
|
logger.info("Pipeline durch Benutzer gestoppt")
|
|
sys.exit(0)
|
|
except Exception as e:
|
|
logger.error(f"Pipeline fehlgeschlagen: {e}", exc_info=True)
|
|
sys.exit(1)
|