""" 03_resource_overhead.py – Ressourceneffizienz (REQ-NF01, NF04, NF05) ============================================================================== Differenz-basierte Overhead-Berechnung: Ebene 0: system_baseline OS + MFT-Software idle, keine Pipeline, kein Transfer → Absoluter Nullpunkt Ebene 1: pipeline_idle_baseline − system_baseline = PIPELINE-OVERHEAD (isoliert) → Beantwortet REQ-NF1 und REQ-NF2 direkt Ebene 2: workload_baseline_phase − pipeline_idle_baseline = TRANSFER-OVERHEAD (MFT-Engine + TIXstream unter Last) → Trennt Pipeline-Overhead vom Transfer-Overhead Ebene 3: workload_injection_phase − workload_baseline_phase = CHAOS-IMPACT auf Ressourcen Alle RAM- und CPU-Werte werden als DIFFERENZEN zum jeweiligen Nullpunkt dargestellt, nicht als absolute Werte. Ausgabe: - output/03_resource_table.csv - output/03_resource_overhead_stacked.pdf/png (gestapelte Differenzen) - output/03_cpu_overhead_timeseries_{run}.pdf/png - output/03_resource_stats.txt """ import sys sys.path.insert(0, ".") from config import * SYSTEM_BASELINE_DIR = ROOT / "pipeline_system_baseline" IDLE_BASELINE_DIR = ROOT / "pipeline_idle_pipeline_baseline" def _to_utc(ts): ts = pd.Timestamp(ts) return ts.tz_localize("UTC") if ts.tzinfo is None else ts.tz_convert("UTC") def load_extra_baseline(directory: Path) -> pd.DataFrame: path = directory / "baseline_metrics.csv" df = pd.read_csv(path) df["timestamp"] = pd.to_datetime(df["timestamp"], unit="s", utc=True) return df GT_OFFSET = pd.Timedelta("-1h") def load_workload_metrics(run_key: str) -> pd.DataFrame: _, pipeline_dir = RUNS[run_key] path = ROOT / pipeline_dir / "baseline_metrics.csv" df = pd.read_csv(path) df["timestamp"] = pd.to_datetime(df["timestamp"], unit="s", utc=True) df["run"] = run_key return df def get_phase_boundaries(run_key: str) -> dict: """ Liest EXPERIMENT_BASELINE_END und EXPERIMENT_CHAOS_START/END aus der GT-CSV und gibt UTC-Zeitstempel zurück. Rückgabe: baseline_end: Ende der Baseline-Phase (UTC) chaos_intervals: Liste von (start_utc, end_utc, scenario) Tupeln """ gt_file, _ = RUNS[run_key] gt = pd.read_csv(ROOT / gt_file, parse_dates=["timestamp"]) gt["timestamp"] = ( pd.to_datetime(gt["timestamp"]).dt.tz_localize(None) + GT_OFFSET ).dt.tz_localize("UTC") bl_end_rows = gt[gt["action"] == "EXPERIMENT_BASELINE_END"] if bl_end_rows.empty: bl_end_rows = gt[gt["action"].isin(["START_ANOMALY", "EXPERIMENT_CHAOS_START"])] baseline_end = bl_end_rows["timestamp"].iloc[0] if not bl_end_rows.empty else None chaos_starts = gt[gt["action"] == "EXPERIMENT_CHAOS_START"].copy() chaos_ends = gt[gt["action"] == "EXPERIMENT_CHAOS_END"].copy() chaos_intervals = [] for _, row in chaos_starts.iterrows(): scenario = str(row.get("workload_type", row.get("scenario", "unknown"))).lower() t_start = row["timestamp"] matching_ends = chaos_ends[ (chaos_ends["workload_type"] == row.get("workload_type", "")) & (chaos_ends["timestamp"] > t_start) ] t_end = ( matching_ends["timestamp"].iloc[0] if not matching_ends.empty else t_start ) chaos_intervals.append((t_start, t_end, scenario)) return { "baseline_end": baseline_end, "chaos_intervals": chaos_intervals, } def split_by_phase( bm: pd.DataFrame, run_key: str, windows_df: pd.DataFrame, baseline_windows: pd.DataFrame, ) -> dict: """ Phasentrennung anhand von EXPERIMENT_BASELINE_END aus der GT-CSV. Alles vor baseline_end → baseline Zeiträume in chaos_intervals → injection """ boundaries = get_phase_boundaries(run_key) baseline_end = boundaries["baseline_end"] chaos_intervals = boundaries["chaos_intervals"] bm = bm.copy() ts = bm["timestamp"] if baseline_end is None: print(f" WARNUNG {run_key}: kein EXPERIMENT_BASELINE_END gefunden") return {"baseline": bm, "injection": pd.DataFrame()} baseline_safe_end = baseline_end - pd.Timedelta("30s") in_baseline = ts < baseline_safe_end in_injection = pd.Series(False, index=bm.index) for t_start, t_end, scenario in chaos_intervals: in_injection |= (ts >= t_start) & (ts <= t_end) in_injection &= ~in_baseline n_bl = int(in_baseline.sum()) n_inj = int(in_injection.sum()) bm_min = ts.min().strftime("%H:%M") bm_max = ts.max().strftime("%H:%M") bl_end_h = baseline_end.strftime("%H:%M") print( f" {run_key}: bm={bm_min}-{bm_max} UTC, " f"baseline_end={bl_end_h} UTC → baseline={n_bl}, injection={n_inj}" ) return { "baseline": bm[in_baseline], "injection": bm[in_injection], } def compute_reference_stats(df: pd.DataFrame) -> dict: """Berechnet Median und P95 für CPU und RAM.""" return { "cpu_median": float(df["cpu_percent"].median()), "cpu_p95": float(df["cpu_percent"].quantile(0.95)), "cpu_arr": df["cpu_percent"].values, "ram_median": float(df["memory_used_mb"].median()), "ram_p95": float(df["memory_used_mb"].quantile(0.95)), "ram_arr": df["memory_used_mb"].values, } def analyze_profile( profile_name: str, run_keys: list, windows_df: pd.DataFrame, baseline_windows: pd.DataFrame, sys_ref: dict, idle_ref: dict, ) -> dict: """ Berechnet CPU- und RAM-Differenzen auf allen Ebenen für ein Profil. """ cpu_bl_abs, cpu_inj_abs = [], [] ram_bl_abs, ram_inj_abs = [], [] cpu_runs, ram_runs = {}, {} for rk in run_keys: try: bm = load_workload_metrics(rk) except FileNotFoundError: continue phases = split_by_phase(bm, rk, windows_df, baseline_windows) if not phases["baseline"].empty: cpu_runs[rk] = phases["baseline"]["cpu_percent"].median() ram_runs[rk] = phases["baseline"]["memory_used_mb"].median() cpu_bl_abs.extend(phases["baseline"]["cpu_percent"].tolist()) cpu_inj_abs.extend(phases["injection"]["cpu_percent"].tolist()) ram_bl_abs.extend(phases["baseline"]["memory_used_mb"].tolist()) ram_inj_abs.extend(phases["injection"]["memory_used_mb"].tolist()) if len(cpu_bl_abs) < 5: return {} cpu_run_vals = list(cpu_runs.values()) ram_run_vals = list(ram_runs.values()) cpu_stability = f"{np.mean(cpu_run_vals):.2f} ± {np.std(cpu_run_vals):.2f}" ram_stability = f"{np.mean(ram_run_vals):.1f} ± {np.std(ram_run_vals):.1f}" cpu_bl = np.array(cpu_bl_abs) cpu_inj = np.array(cpu_inj_abs) ram_bl = np.array(ram_bl_abs) ram_inj = np.array(ram_inj_abs) cpu_transfer_overhead = cpu_bl - idle_ref["cpu_median"] ram_transfer_overhead = ram_bl - idle_ref["ram_median"] cpu_chaos = ( (cpu_inj - float(np.median(cpu_bl))) if len(cpu_inj) > 0 else np.array([]) ) ram_chaos = ( (ram_inj - float(np.median(ram_bl))) if len(ram_inj) > 0 else np.array([]) ) return { "profile": profile_name, "n_bl": len(cpu_bl), "n_inj": len(cpu_inj), "cpu_stability": cpu_stability, "ram_stability": ram_stability, "cpu_bl_abs_median": float(np.median(cpu_bl)), "cpu_bl_abs_p95": float(np.percentile(cpu_bl, 95)), "ram_bl_abs_median": float(np.median(ram_bl)), "ram_bl_abs_p95": float(np.percentile(ram_bl, 95)), "cpu_transfer_med": float(np.median(cpu_transfer_overhead)), "cpu_transfer_p95": float(np.percentile(cpu_transfer_overhead, 95)), "ram_transfer_med": float(np.median(ram_transfer_overhead)), "ram_transfer_p95": float(np.percentile(ram_transfer_overhead, 95)), "cpu_chaos_med": float(np.nanmedian(cpu_chaos)) if len(cpu_chaos) > 0 else np.nan, "cpu_chaos_p95": float(np.nanpercentile(cpu_chaos, 95)) if len(cpu_chaos) > 0 else np.nan, "ram_chaos_med": float(np.nanmedian(ram_chaos)) if len(ram_chaos) > 0 else np.nan, "ram_chaos_p95": float(np.nanpercentile(ram_chaos, 95)) if len(ram_chaos) > 0 else np.nan, # Arrays für Plot "_cpu_bl_abs": cpu_bl, "_ram_bl_abs": ram_bl, "_cpu_transfer": cpu_transfer_overhead, "_ram_transfer": ram_transfer_overhead, "_cpu_chaos": cpu_chaos, } def plot_stacked_overhead( sys_ref: dict, idle_ref: dict, pipeline_overhead: dict, results: list ): """ Gestapeltes Balkendiagramm das die Overhead-Schichten visualisiert: Ebene 0: System-Baseline (grau) Ebene 1: + Pipeline-Overhead (grün) Ebene 2: + Transfer-Overhead pro Profil (blau) Ebene 3: + Chaos-Impact (rot, gestrichelt) """ fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(13, 6)) profiles = [r["profile"] for r in results if r] x = np.arange(len(profiles)) width = 0.55 for ax, metric, unit in [ (ax1, "cpu", "%"), (ax2, "ram", "MB"), ]: sys_val = sys_ref[f"{metric}_median"] pipeline_val = pipeline_overhead[f"{metric}_overhead_median"] sys_arr = np.full(len(x), sys_val) pipe_arr = np.full(len(x), pipeline_val) transfer_med = np.array( [r[f"{metric}_transfer_med"] for r in results if r], dtype=float ) chaos_med = np.array( [r[f"{metric}_chaos_med"] for r in results if r], dtype=float ) chaos_clean = np.where(np.isnan(chaos_med), 0.0, chaos_med) transfer_plot = np.maximum(transfer_med, 0.0) ax.bar( x, sys_arr, width, label=f"System-Baseline ({sys_val:.1f} {unit})", color="#B0BEC5", zorder=2, ) ax.bar( x, pipe_arr, width, bottom=sys_arr, label=f"Pipeline-Overhead ({pipeline_val:+.1f} {unit})", color="#66BB6A", zorder=2, ) ax.bar( x, transfer_plot, width, bottom=sys_arr + pipe_arr, label="Transfer-Overhead (Median, ≥0)", color="#42A5F5", zorder=2, ) if np.any(chaos_clean > 0): totals = sys_arr + pipe_arr + transfer_plot ax.errorbar( x, totals, yerr=[np.zeros(len(x)), chaos_clean], fmt="none", color="firebrick", capsize=4, lw=1.5, label="Chaos-Impact (Median)", ) else: ax.text( 0.5, 0.97, "Chaos-Impact: keine Injektionsdaten (Zeitfenster außerhalb Aufzeichnung)", transform=ax.transAxes, ha="center", va="top", fontsize=7, color="gray", bbox=dict(boxstyle="round,pad=0.3", fc="white", alpha=0.7), ) ax.set_xticks(x) ax.set_xticklabels(profiles, rotation=15) ax.set_ylabel( f"{'CPU-Auslastung' if metric == 'cpu' else 'RAM-Verbrauch'} ({unit})" ) ax.set_title(f"{'CPU' if metric == 'cpu' else 'RAM'}: Overhead-Schichten") ax.legend(fontsize=7.5, loc="upper left") ax.set_ylim(bottom=0) plt.suptitle( "Ressourcen-Overhead-Analyse: System → Pipeline → Transfer → Chaos", fontsize=11 ) plt.tight_layout() save_fig("03_resource_overhead_stacked") def plot_cpu_overhead_timeseries( run_key: str, windows_df: pd.DataFrame, sys_ref: dict, idle_ref: dict ): """ Zeitreihe des CPU-OVERHEADS (nicht absolute Werte). Zeigt: CPU_workload − sys_baseline_median als Differenz. Referenzlinien: Pipeline-Overhead (idle−sys), REQ-NF1-Grenze. """ try: bm = load_workload_metrics(run_key) except FileNotFoundError: return inj = windows_df[windows_df["run"] == run_key].copy() inj["t_inj"] = inj["t_inj"].apply(_to_utc) inj["t_stop"] = inj["t_stop"].apply(_to_utc) bm["cpu_overhead"] = bm["cpu_percent"] - sys_ref["cpu_median"] pipeline_overhead_val = idle_ref["cpu_median"] - sys_ref["cpu_median"] fig, ax = plt.subplots(figsize=(14, 4)) ts_r = bm.set_index("timestamp")["cpu_overhead"].resample("10s").median() ax.plot( ts_r.index, ts_r.values, color=PALETTE["pipeline"], lw=0.8, label="CPU-Overhead (workload − sys_baseline)", ) ax.axhline( pipeline_overhead_val, color="#66BB6A", linestyle="--", lw=1.2, label=f"Pipeline-Overhead-Referenz ({pipeline_overhead_val:+.1f}%)", ) ax.axhline(0, color="gray", linestyle=":", lw=0.7, alpha=0.5) ylim_top = float(ts_r.quantile(0.99)) * 1.15 if not ts_r.empty else 10 for _, row in inj.iterrows(): ax.axvspan(row["t_inj"], row["t_stop"], alpha=0.10, color=PALETTE["anomaly"]) ax.text( row["t_inj"], ylim_top, row["scenario"], rotation=90, fontsize=5, va="top", color="firebrick", alpha=0.6, ) ax.xaxis.set_major_formatter(mdates.DateFormatter("%H:%M")) ax.set_xlabel("Zeit (UTC)") ax.set_ylabel("CPU-Overhead relativ zu System-Baseline (%)") ax.set_title(f"CPU-Overhead-Zeitreihe – {run_key}") ax.set_ylim(bottom=-5, top=max(15, ylim_top)) ax.legend(loc="upper right", fontsize=8) save_fig(f"03_cpu_overhead_timeseries_{run_key}") def print_and_save_table( sys_ref: dict, idle_ref: dict, pipeline_overhead: dict, results: list ): print_section("Referenz-Messungen") print( f" system_baseline: " f"CPU={sys_ref['cpu_median']:.2f}% (P95={sys_ref['cpu_p95']:.2f}%), " f"RAM={sys_ref['ram_median']:.0f} MB (P95={sys_ref['ram_p95']:.0f} MB)" ) print( f" pipeline_idle_baseline: " f"CPU={idle_ref['cpu_median']:.2f}% (P95={idle_ref['cpu_p95']:.2f}%), " f"RAM={idle_ref['ram_median']:.0f} MB (P95={idle_ref['ram_p95']:.0f} MB)" ) print_section("Pipeline-Overhead (idle − system) → REQ-NF01") cpu_ov_med = pipeline_overhead["cpu_overhead_median"] cpu_ov_p95 = pipeline_overhead["cpu_overhead_p95"] ram_ov_med = pipeline_overhead["ram_overhead_median"] ram_ov_p95 = pipeline_overhead["ram_overhead_p95"] print(f" CPU Overhead: Median={cpu_ov_med:+.2f}%, P95={cpu_ov_p95:+.2f}%") print(f" RAM Overhead: Median={ram_ov_med:+.1f} MB, P95={ram_ov_p95:+.1f} MB") req_nf01 = cpu_ov_p95 < 10.0 print( f" REQ-NF01 (Pipeline-CPU-Overhead < 10%): " f"{'OK' if req_nf01 else 'VERLETZT'} (P95={cpu_ov_p95:.2f}%)" ) print_section( "Transfer-Overhead pro Workload-Profil (workload_baseline − pipeline_idle)" ) rows = [] for r in results: if not r: continue rows.append( { "Profil": r["profile"], "CPU Stab. (Med±Std)": r["cpu_stability"], "RAM Stab. (Med±Std)": r["ram_stability"], "CPU Transfer Med.(%)": f"{r['cpu_transfer_med']:+.2f}", "CPU Transfer P95(%)": f"{r['cpu_transfer_p95']:+.2f}", "RAM Transfer Med.(MB)": f"{r['ram_transfer_med']:+.1f}", "RAM Transfer P95(MB)": f"{r['ram_transfer_p95']:+.1f}", } ) tdf = pd.DataFrame(rows) print(tdf.to_string(index=False)) tdf.to_csv(OUTPUT_DIR / "03_resource_table.csv", index=False) with open(OUTPUT_DIR / "03_resource_stats.txt", "w") as f: f.write("Ressourcenverbrauch-Analyse (differenzbasiert)\n") f.write("=" * 65 + "\n\n") f.write( f"system_baseline: CPU={sys_ref['cpu_median']:.2f}%, " f"RAM={sys_ref['ram_median']:.0f} MB\n" ) f.write( f"pipeline_idle: CPU={idle_ref['cpu_median']:.2f}%, " f"RAM={idle_ref['ram_median']:.0f} MB\n\n" ) f.write(f"Pipeline-Overhead (idle − system):\n") f.write(f" CPU: Median={cpu_ov_med:+.2f}%, P95={cpu_ov_p95:+.2f}%\n") f.write(f" RAM: Median={ram_ov_med:+.1f} MB, P95={ram_ov_p95:+.1f} MB\n") f.write(f" REQ-NF01 (CPU Overhead): {'OK' if req_nf01 else 'VERLETZT'}\n\n") f.write("Transfer-Overhead pro Profil:\n") f.write(tdf.to_string(index=False)) def main(): print_section("03 – Ressourcenverbrauch (REQ-NF01, NF04, NF05)") print(" Methode: Differenz-basierte Overhead-Schichten") print(" Pipeline-Overhead = pipeline_idle − system_baseline") print(" Transfer-Overhead = workload_baseline − pipeline_idle") print(" Chaos-Impact = workload_injection − workload_baseline\n") windows_df = pd.read_csv( OUTPUT_DIR / "injection_windows.csv", parse_dates=["t_inj", "t_stop"] ) baseline_windows = pd.read_csv( OUTPUT_DIR / "baseline_windows.csv", parse_dates=["t_start", "t_end"] ) for col in ["t_inj", "t_stop"]: windows_df[col] = windows_df[col].apply(_to_utc) for col in ["t_start", "t_end"]: baseline_windows[col] = baseline_windows[col].apply(_to_utc) try: sys_df = load_extra_baseline(SYSTEM_BASELINE_DIR) idle_df = load_extra_baseline(IDLE_BASELINE_DIR) except FileNotFoundError as e: print(f" FEHLER: {e}") return sys_ref = compute_reference_stats(sys_df) idle_ref = compute_reference_stats(idle_df) pipeline_overhead = { "cpu_overhead_median": idle_ref["cpu_median"] - sys_ref["cpu_median"], "cpu_overhead_p95": idle_ref["cpu_p95"] - sys_ref["cpu_median"], "ram_overhead_median": idle_ref["ram_median"] - sys_ref["ram_median"], "ram_overhead_p95": idle_ref["ram_p95"] - sys_ref["ram_median"], } print( f" system_baseline: CPU={sys_ref['cpu_median']:.2f}%, " f"RAM={sys_ref['ram_median']:.0f} MB" ) print( f" pipeline_idle: CPU={idle_ref['cpu_median']:.2f}%, " f"RAM={idle_ref['ram_median']:.0f} MB" ) print( f" Pipeline-Overhead CPU: {pipeline_overhead['cpu_overhead_median']:+.2f}% " f"(P95={pipeline_overhead['cpu_overhead_p95']:+.2f}%)" ) print( f" Pipeline-Overhead RAM: {pipeline_overhead['ram_overhead_median']:+.1f} MB " f"(P95={pipeline_overhead['ram_overhead_p95']:+.1f} MB)\n" ) results = [] for profile, run_keys in WORKLOAD_PROFILES.items(): print(f" Profil: {profile}") r = analyze_profile( profile, run_keys, windows_df, baseline_windows, sys_ref, idle_ref ) results.append(r) if r: print( f" Transfer-Overhead CPU: {r['cpu_transfer_med']:+.2f}% " f"(P95={r['cpu_transfer_p95']:+.2f}%)" ) print( f" Transfer-Overhead RAM: {r['ram_transfer_med']:+.1f} MB " f"(P95={r['ram_transfer_p95']:+.1f} MB)" ) plot_cpu_overhead_timeseries(run_keys[0], windows_df, sys_ref, idle_ref) print_and_save_table(sys_ref, idle_ref, pipeline_overhead, results) plot_stacked_overhead(sys_ref, idle_ref, pipeline_overhead, results) print( f"\n→ Gespeichert: 03_resource_table.csv, " f"03_resource_overhead_stacked.pdf/png, " f"03_cpu_overhead_timeseries_*.pdf/png" ) if __name__ == "__main__": main()