""" 02_throughput.py – Primary Workload Protection (REQ-NF01) ========================================================= Ausgabe: - output/02_throughput_table.csv - output/02_throughput_comparison_boxplot.pdf/png - output/02_throughput_stats.txt """ import sys from pathlib import Path sys.path.insert(0, ".") from config import * def _to_utc(ts): ts = pd.Timestamp(ts) return ts.tz_localize("UTC") if ts.tzinfo is None else ts.tz_convert("UTC") def load_phase_throughput(run_key: str, phase_windows: pd.DataFrame) -> pd.DataFrame: """ Berechnet den mittleren Durchsatz pro Phase eines Laufs aus baseline_metrics.csv. """ try: bm = load_baseline_metrics(run_key) except Exception as e: print(f" WARNUNG {run_key}: baseline_metrics.csv fehlt ({e})") return pd.DataFrame() bm["timestamp"] = bm["timestamp"].apply(_to_utc) run_phases = phase_windows[phase_windows["run"] == run_key] results = [] for _, row in run_phases.iterrows(): t0 = _to_utc(row["t_start"]) t1 = _to_utc(row["t_end"]) mask = (bm["timestamp"] >= t0) & (bm["timestamp"] <= t1) phase_data = bm.loc[mask, "network_sent_mb_s"] if not phase_data.empty: avg_tp = phase_data.mean() if avg_tp > 5.0: results.append( { "run": run_key, "phase": row["phase"], "throughput_mbs": avg_tp, "is_pipeline_active": run_key not in VALIDATION_RUNS, } ) return pd.DataFrame(results) def main(): print_section("02 – Transferdurchsatz-Vergleich (REQ-NF01)") print(" Ziel: Durchsatz mit vs. ohne Pipeline ( network_sent_mb_s )") print(" Referenz: validation_run1-3 (FULL_CYCLE, Pipeline INAKTIV)") print(" Test: full_cycle_run1-3 (FULL_CYCLE, Pipeline AKTIV)") phase_windows = pd.read_csv(OUTPUT_DIR / "phase_windows.csv") REFERENCE_RUNS = VALIDATION_RUNS TEST_RUNS = WORKLOAD_PROFILES["full_cycle"] all_results = [] for run_key in REFERENCE_RUNS + TEST_RUNS: res = load_phase_throughput(run_key, phase_windows) if not res.empty: all_results.append(res) if not all_results: print(" FEHLER: Keine Durchsatzdaten gefunden.") return df = pd.concat(all_results, ignore_index=True) comparison = [] PHASE_MAPPING = { "BW": "High Bandwidth", "IOPS": "High IOPS", "BATCH_OUT": "Batch Out", } for phase_key, profile_label in PHASE_MAPPING.items(): sub = df[df["phase"] == phase_key] if sub.empty: continue def get_run_stats(data_sub): run_medians = data_sub.groupby("run")["throughput_mbs"].median() return run_medians.mean(), run_medians.std(), run_medians.count() with_p_sub = sub[sub["is_pipeline_active"] == True] without_p_sub = sub[sub["is_pipeline_active"] == False] if with_p_sub.empty or without_p_sub.empty: continue mean_med_with, std_med_with, n_runs_with = get_run_stats(with_p_sub) mean_med_without, std_med_without, n_runs_without = get_run_stats(without_p_sub) diff_pct = (mean_med_with - mean_med_without) / mean_med_without * 100 stat, p_val = wilcoxon_test( with_p_sub["throughput_mbs"].values, without_p_sub["throughput_mbs"].values ) r_val = rosenthal_r(stat, len(with_p_sub), len(without_p_sub)) comparison.append( { "Profil": profile_label, "Runs (Test/Ref)": f"{int(n_runs_with)}/{int(n_runs_without)}", "Median_Ref (MB/s)": f"{mean_med_without:.1f} ± {std_med_without:.2f}", "Median_Test (MB/s)": f"{mean_med_with:.1f} ± {std_med_with:.2f}", "Diff (%)": diff_pct, "p-Wert (MWU)": p_val, "Effektstärke (r)": r_val, } ) comp_df = pd.DataFrame(comparison) print("\nVergleichstabelle Durchsatz (validation_runs vs. full_cycle_runs):") print( comp_df.to_string( index=False, formatters={ "Diff (%)": "{:+.2f}%".format, "p-Wert (MWU)": "{:.4f}".format, "Effektstärke (r)": "{:.3f}".format, }, ) ) comp_df.to_csv(OUTPUT_DIR / "02_throughput_table.csv", index=False) plt.figure(figsize=(10, 6)) df_plot = df[df["phase"].isin(PHASE_MAPPING.keys())].copy() df_plot["Profil"] = df_plot["phase"].map(PHASE_MAPPING) df_plot["Status"] = df_plot["is_pipeline_active"].map( {True: "Pipeline Aktiv (full_cycle)", False: "Ohne Pipeline (validation)"} ) sns.boxplot( data=df_plot, x="Profil", y="throughput_mbs", hue="Status", palette="Set2", showmeans=True, meanprops={ "marker": "o", "markerfacecolor": "white", "markeredgecolor": "black", "markersize": "5", }, ) plt.title( "Durchsatz-Vergleich (REQ-NF01): FULL_CYCLE mit vs. ohne Pipeline\n" "(Statistik: Mann-Whitney-U-Test)" ) plt.ylabel("Durchsatz (MB/s)") plt.grid(axis="y", alpha=0.3) save_fig("02_throughput_comparison_boxplot") # Report with open(OUTPUT_DIR / "02_throughput_stats.txt", "w") as f: f.write("Evaluation REQ-NF01: Workload Protection (Robust Statistics)\n") f.write("=" * 65 + "\n\n") f.write("Vergleichsgruppen:\n") f.write(f" Referenz (Pipeline inaktiv): {', '.join(REFERENCE_RUNS)}\n") f.write(f" Test (Pipeline aktiv): {', '.join(TEST_RUNS)}\n") f.write(f" Workload-Profil: FULL_CYCLE (identisch)\n\n") f.write(comp_df.to_string(index=False)) f.write("\n\nInterpretation:\n") for _, row in comp_df.iterrows(): is_significant = row["p-Wert (MWU)"] < ALPHA status = ( "ERFÜLLT" if (row["Diff (%)"] > -1.0 or not is_significant) else "KRITISCH" ) f.write(f"- {row['Profil']}:\n") f.write( f" Abweichung: {row['Diff (%)']:+.2f}%, p-Wert: {row['p-Wert (MWU)']:.4f}\n" ) f.write( f" Effektstärke r: {row['Effektstärke (r)']:.3f} ({effect_size_label(row['Effektstärke (r)'] or 0)})\n" ) f.write(f" Ergebnis: REQ-NF01 {status}\n") f.write("\nMethodischer Hinweis:\n") f.write( "Aufgrund der Nicht-Normalverteilung der Durchsatzdaten wurde der Mann-Whitney-U-Test\n" ) f.write( "verwendet. Als Effektstärke wird Rosenthal's r (Z / sqrt(N)) berechnet.\n" ) f.write("Ein p-Wert > 0.05 indiziert, dass kein statistisch signifikanter\n") f.write( "Unterschied zwischen den Gruppen (mit/ohne Pipeline) nachgewiesen werden kann.\n" ) f.write( "Beide Vergleichsgruppen verwenden dasselbe FULL_CYCLE-Workload-Profil.\n" ) print(f"\n→ Ergebnisse gespeichert in {OUTPUT_DIR}/02_throughput_*") if __name__ == "__main__": main()