bachelor-thesis/evaluation/07_pipeline_health.py

134 lines
4 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
07_pipeline_health.py Performance-Analyse der internen Pipeline-Stufen
=======================================================================
Extrahiert Metriken aus pipeline.log:
- Latenz pro Stage (avg_latency_ms)
- Durchsatz pro Stage (throughput_eps)
- Events verarbeitet/verworfen
"""
import json
import re
import sys
from pathlib import Path
import matplotlib.pyplot as plt
import pandas as pd
import seaborn as sns
sys.path.insert(0, ".")
from config import *
LOG_PATTERN = re.compile(r"^(\d{4}/\d{2}/\d{2} \d{2}:\d{2}:\d{2}) \[(.*?)\] (\{.*\})$")
def parse_pipeline_log(run_key: str) -> pd.DataFrame:
"""Liest pipeline.log eines Laufs und extrahiert die Health-Metriken."""
_, run_dir = RUNS[run_key]
log_path = ROOT / run_dir / "pipeline.log"
if not log_path.exists():
return pd.DataFrame()
records = []
with open(log_path, "r") as f:
for line in f:
match = LOG_PATTERN.match(line.strip())
if match:
log_ts, stage_label, json_str = match.groups()
try:
data = json.loads(json_str)
data["log_timestamp"] = pd.to_datetime(
log_ts, format="%Y/%m/%d %H:%M:%S"
)
data["run"] = run_key
records.append(data)
except json.JSONDecodeError:
continue
return pd.DataFrame(records)
def main():
print_section("07 Pipeline Health & Latency Analysis")
all_data = []
for run_key in RUNS:
df_run = parse_pipeline_log(run_key)
if not df_run.empty:
print(f" {run_key}: {len(df_run)} Health-Einträge extrahiert.")
csv_path = OUTPUT_DIR / f"health_{run_key}.csv"
df_run.to_csv(csv_path, index=False)
all_data.append(df_run)
if not all_data:
print(" Keine pipeline.log Dateien gefunden.")
return
df = pd.concat(all_data, ignore_index=True)
# --- Statistik berechnen ---
stats_df = (
df.groupby("stage_name")
.agg(
{
"avg_latency_ms": ["median", "mean", "std", "max"],
"throughput_eps": ["median", "max"],
"events_processed": "sum",
"events_dropped": "sum",
}
)
.round(3)
)
print("\nStatistik der Pipeline-Stages (Latenz in ms):")
print(stats_df.to_string())
stats_df.to_csv(OUTPUT_DIR / "07_pipeline_health_stats.csv")
# --- Visualisierung 1: Latenzen pro Stage ---
plt.figure(figsize=(12, 7))
sns.boxplot(data=df, x="stage_name", y="avg_latency_ms", palette="viridis")
plt.yscale("symlog", linthresh=1)
plt.title("Latenz-Verteilung der Pipeline-Komponenten (REQ-NF06)")
plt.ylabel("Durchschnittliche Latenz [ms] (Log-Skala)")
plt.xlabel("Pipeline Stage")
plt.xticks(rotation=15)
plt.grid(True, which="both", axis="y", alpha=0.3)
save_fig("07_pipeline_latency_boxplot")
# --- Visualisierung 2: Durchsatz pro Stage ---
plt.figure(figsize=(12, 6))
sns.barplot(
data=df,
x="stage_name",
y="throughput_eps",
estimator="median",
errorbar="sd",
palette="magma",
)
plt.title("Medianer Durchsatz der Pipeline-Komponenten")
plt.ylabel("Events pro Sekunde [EPS]")
plt.xlabel("Pipeline Stage")
plt.xticks(rotation=15)
plt.grid(True, axis="y", alpha=0.3)
save_fig("07_pipeline_throughput_bar")
with open(OUTPUT_DIR / "07_health_interpretation.txt", "w") as f:
f.write("Pipeline Health & Performance Summary\n")
f.write("=" * 40 + "\n\n")
f.write(stats_df.to_string())
f.write("\n\nInterpretation:\n")
f.write(
"- Die Stage mit der höchsten Latenz ist der Flaschenhals der Pipeline.\n"
)
f.write(
"- 'events_dropped' > 0 deutet auf aktives Load-Shedding hin (REQ-NF04).\n"
)
print(f"\n→ Ergebnisse gespeichert in {OUTPUT_DIR}/07_pipeline_*")
if __name__ == "__main__":
main()