134 lines
4 KiB
Python
134 lines
4 KiB
Python
"""
|
||
07_pipeline_health.py – Performance-Analyse der internen Pipeline-Stufen
|
||
=======================================================================
|
||
Extrahiert Metriken aus pipeline.log:
|
||
- Latenz pro Stage (avg_latency_ms)
|
||
- Durchsatz pro Stage (throughput_eps)
|
||
- Events verarbeitet/verworfen
|
||
"""
|
||
|
||
import json
|
||
import re
|
||
import sys
|
||
from pathlib import Path
|
||
|
||
import matplotlib.pyplot as plt
|
||
import pandas as pd
|
||
import seaborn as sns
|
||
|
||
sys.path.insert(0, ".")
|
||
from config import *
|
||
|
||
LOG_PATTERN = re.compile(r"^(\d{4}/\d{2}/\d{2} \d{2}:\d{2}:\d{2}) \[(.*?)\] (\{.*\})$")
|
||
|
||
|
||
def parse_pipeline_log(run_key: str) -> pd.DataFrame:
|
||
"""Liest pipeline.log eines Laufs und extrahiert die Health-Metriken."""
|
||
_, run_dir = RUNS[run_key]
|
||
log_path = ROOT / run_dir / "pipeline.log"
|
||
|
||
if not log_path.exists():
|
||
return pd.DataFrame()
|
||
|
||
records = []
|
||
with open(log_path, "r") as f:
|
||
for line in f:
|
||
match = LOG_PATTERN.match(line.strip())
|
||
if match:
|
||
log_ts, stage_label, json_str = match.groups()
|
||
try:
|
||
data = json.loads(json_str)
|
||
data["log_timestamp"] = pd.to_datetime(
|
||
log_ts, format="%Y/%m/%d %H:%M:%S"
|
||
)
|
||
data["run"] = run_key
|
||
records.append(data)
|
||
except json.JSONDecodeError:
|
||
continue
|
||
|
||
return pd.DataFrame(records)
|
||
|
||
|
||
def main():
|
||
print_section("07 – Pipeline Health & Latency Analysis")
|
||
|
||
all_data = []
|
||
|
||
for run_key in RUNS:
|
||
df_run = parse_pipeline_log(run_key)
|
||
if not df_run.empty:
|
||
print(f" {run_key}: {len(df_run)} Health-Einträge extrahiert.")
|
||
csv_path = OUTPUT_DIR / f"health_{run_key}.csv"
|
||
df_run.to_csv(csv_path, index=False)
|
||
all_data.append(df_run)
|
||
|
||
if not all_data:
|
||
print(" Keine pipeline.log Dateien gefunden.")
|
||
return
|
||
|
||
df = pd.concat(all_data, ignore_index=True)
|
||
|
||
# --- Statistik berechnen ---
|
||
stats_df = (
|
||
df.groupby("stage_name")
|
||
.agg(
|
||
{
|
||
"avg_latency_ms": ["median", "mean", "std", "max"],
|
||
"throughput_eps": ["median", "max"],
|
||
"events_processed": "sum",
|
||
"events_dropped": "sum",
|
||
}
|
||
)
|
||
.round(3)
|
||
)
|
||
|
||
print("\nStatistik der Pipeline-Stages (Latenz in ms):")
|
||
print(stats_df.to_string())
|
||
|
||
stats_df.to_csv(OUTPUT_DIR / "07_pipeline_health_stats.csv")
|
||
|
||
# --- Visualisierung 1: Latenzen pro Stage ---
|
||
plt.figure(figsize=(12, 7))
|
||
sns.boxplot(data=df, x="stage_name", y="avg_latency_ms", palette="viridis")
|
||
plt.yscale("symlog", linthresh=1)
|
||
plt.title("Latenz-Verteilung der Pipeline-Komponenten (REQ-NF06)")
|
||
plt.ylabel("Durchschnittliche Latenz [ms] (Log-Skala)")
|
||
plt.xlabel("Pipeline Stage")
|
||
plt.xticks(rotation=15)
|
||
plt.grid(True, which="both", axis="y", alpha=0.3)
|
||
save_fig("07_pipeline_latency_boxplot")
|
||
|
||
# --- Visualisierung 2: Durchsatz pro Stage ---
|
||
plt.figure(figsize=(12, 6))
|
||
sns.barplot(
|
||
data=df,
|
||
x="stage_name",
|
||
y="throughput_eps",
|
||
estimator="median",
|
||
errorbar="sd",
|
||
palette="magma",
|
||
)
|
||
plt.title("Medianer Durchsatz der Pipeline-Komponenten")
|
||
plt.ylabel("Events pro Sekunde [EPS]")
|
||
plt.xlabel("Pipeline Stage")
|
||
plt.xticks(rotation=15)
|
||
plt.grid(True, axis="y", alpha=0.3)
|
||
save_fig("07_pipeline_throughput_bar")
|
||
|
||
with open(OUTPUT_DIR / "07_health_interpretation.txt", "w") as f:
|
||
f.write("Pipeline Health & Performance Summary\n")
|
||
f.write("=" * 40 + "\n\n")
|
||
f.write(stats_df.to_string())
|
||
f.write("\n\nInterpretation:\n")
|
||
f.write(
|
||
"- Die Stage mit der höchsten Latenz ist der Flaschenhals der Pipeline.\n"
|
||
)
|
||
f.write(
|
||
"- 'events_dropped' > 0 deutet auf aktives Load-Shedding hin (REQ-NF04).\n"
|
||
)
|
||
|
||
print(f"\n→ Ergebnisse gespeichert in {OUTPUT_DIR}/07_pipeline_*")
|
||
|
||
|
||
if __name__ == "__main__":
|
||
main()
|