""" 07_pipeline_health.py – Performance-Analyse der internen Pipeline-Stufen ======================================================================= Extrahiert Metriken aus pipeline.log: - Latenz pro Stage (avg_latency_ms) - Durchsatz pro Stage (throughput_eps) - Events verarbeitet/verworfen """ import json import re import sys from pathlib import Path import matplotlib.pyplot as plt import pandas as pd import seaborn as sns sys.path.insert(0, ".") from config import * LOG_PATTERN = re.compile(r"^(\d{4}/\d{2}/\d{2} \d{2}:\d{2}:\d{2}) \[(.*?)\] (\{.*\})$") def parse_pipeline_log(run_key: str) -> pd.DataFrame: """Liest pipeline.log eines Laufs und extrahiert die Health-Metriken.""" _, run_dir = RUNS[run_key] log_path = ROOT / run_dir / "pipeline.log" if not log_path.exists(): return pd.DataFrame() records = [] with open(log_path, "r") as f: for line in f: match = LOG_PATTERN.match(line.strip()) if match: log_ts, stage_label, json_str = match.groups() try: data = json.loads(json_str) data["log_timestamp"] = pd.to_datetime( log_ts, format="%Y/%m/%d %H:%M:%S" ) data["run"] = run_key records.append(data) except json.JSONDecodeError: continue return pd.DataFrame(records) def main(): print_section("07 – Pipeline Health & Latency Analysis") all_data = [] for run_key in RUNS: df_run = parse_pipeline_log(run_key) if not df_run.empty: print(f" {run_key}: {len(df_run)} Health-Einträge extrahiert.") csv_path = OUTPUT_DIR / f"health_{run_key}.csv" df_run.to_csv(csv_path, index=False) all_data.append(df_run) if not all_data: print(" Keine pipeline.log Dateien gefunden.") return df = pd.concat(all_data, ignore_index=True) # --- Statistik berechnen --- stats_df = ( df.groupby("stage_name") .agg( { "avg_latency_ms": ["median", "mean", "std", "max"], "throughput_eps": ["median", "max"], "events_processed": "sum", "events_dropped": "sum", } ) .round(3) ) print("\nStatistik der Pipeline-Stages (Latenz in ms):") print(stats_df.to_string()) stats_df.to_csv(OUTPUT_DIR / "07_pipeline_health_stats.csv") # --- Visualisierung 1: Latenzen pro Stage --- plt.figure(figsize=(12, 7)) sns.boxplot(data=df, x="stage_name", y="avg_latency_ms", palette="viridis") plt.yscale("symlog", linthresh=1) plt.title("Latenz-Verteilung der Pipeline-Komponenten (REQ-NF06)") plt.ylabel("Durchschnittliche Latenz [ms] (Log-Skala)") plt.xlabel("Pipeline Stage") plt.xticks(rotation=15) plt.grid(True, which="both", axis="y", alpha=0.3) save_fig("07_pipeline_latency_boxplot") # --- Visualisierung 2: Durchsatz pro Stage --- plt.figure(figsize=(12, 6)) sns.barplot( data=df, x="stage_name", y="throughput_eps", estimator="median", errorbar="sd", palette="magma", ) plt.title("Medianer Durchsatz der Pipeline-Komponenten") plt.ylabel("Events pro Sekunde [EPS]") plt.xlabel("Pipeline Stage") plt.xticks(rotation=15) plt.grid(True, axis="y", alpha=0.3) save_fig("07_pipeline_throughput_bar") with open(OUTPUT_DIR / "07_health_interpretation.txt", "w") as f: f.write("Pipeline Health & Performance Summary\n") f.write("=" * 40 + "\n\n") f.write(stats_df.to_string()) f.write("\n\nInterpretation:\n") f.write( "- Die Stage mit der höchsten Latenz ist der Flaschenhals der Pipeline.\n" ) f.write( "- 'events_dropped' > 0 deutet auf aktives Load-Shedding hin (REQ-NF04).\n" ) print(f"\n→ Ergebnisse gespeichert in {OUTPUT_DIR}/07_pipeline_*") if __name__ == "__main__": main()