""" 01_ground_truth.py – Ground-Truth-Aufbereitung und Injektionsfenster ===================================================================== Liest die GT-CSVs des Scenario-Generators und extrahiert daraus Injektionsfenster [t_inj, t_stop] pro Szenario und Lauf. Ausgabe: - output/injection_windows.csv - output/baseline_windows.csv - output/01_gt_summary.txt """ import sys sys.path.insert(0, ".") from config import * GT_OFFSET = pd.Timedelta("-1h") def load_gt_corrected(run_key: str) -> pd.DataFrame: """Lädt GT-CSV und korrigiert den 1h-Zeitversatz.""" gt_file, _ = RUNS[run_key] df = pd.read_csv(ROOT / gt_file, parse_dates=["timestamp"]) df["timestamp"] = pd.to_datetime(df["timestamp"]).dt.tz_localize(None) + GT_OFFSET df["timestamp"] = df["timestamp"].dt.tz_localize("UTC") df["run"] = run_key return df def extract_injection_windows(run_key: str) -> pd.DataFrame: """ Extrahiert Injektionsfenster aus START_ANOMALY / STOP_ANOMALY Paaren. Strategie: 1. Alle START_ANOMALY-Zeilen als t_inj 2. Nächste STOP_ANOMALY derselben workload_type als t_stop 3. Szenario-ID aus workload_type ableiten """ df = load_gt_corrected(run_key) df = df.sort_values("timestamp").reset_index(drop=True) starts = df[df["action"] == "START_ANOMALY"].copy() stops = df[df["action"] == "STOP_ANOMALY"].copy() if starts.empty: print( f" INFO {run_key}: keine START_ANOMALY-Zeilen, " f"nutze is_anomaly-Blöcke als Fallback" ) return _extract_from_anomaly_blocks(df, run_key) windows = [] for _, start_row in starts.iterrows(): wt = str(start_row.get("workload_type", "")).lower() t_inj = start_row["timestamp"] candidates = stops[ (stops["workload_type"] == start_row["workload_type"]) & (stops["timestamp"] > t_inj) ] if candidates.empty: next_starts = starts[starts["timestamp"] > t_inj] t_stop = ( next_starts["timestamp"].iloc[0] if not next_starts.empty else df["timestamp"].iloc[-1] ) else: t_stop = candidates["timestamp"].iloc[0] scenario = _infer_scenario(wt) duration = (t_stop - t_inj).total_seconds() windows.append( { "run": run_key, "scenario": scenario, "t_inj": t_inj, "t_stop": t_stop, "duration_s": duration, } ) return pd.DataFrame(windows) def _infer_scenario(workload_type: str) -> str: """Leitet Szenario-ID aus workload_type ab.""" wt = workload_type.lower().replace("_", "-") for sid in SCENARIO_IDS: if sid in wt or sid.replace("-", "_") in wt: return sid mapping = { "slow": "slow-connection", "latency": "high-latency", "loss": "packet-loss", "cong": "congestion", "outage": "partial-outage", "flap": "flapping", "cpu": "cpu-stress", "io": "io-stress", "mem": "mem-stress", } for key, sid in mapping.items(): if key in wt: return sid return wt if wt else "unknown" def _extract_from_anomaly_blocks(df: pd.DataFrame, run_key: str) -> pd.DataFrame: """Fallback: zusammenhängende is_anomaly==1-Blöcke als Fenster.""" GAP = pd.Timedelta("90s") anomaly_rows = df[df["is_anomaly"] == 1].sort_values("timestamp") if anomaly_rows.empty: return pd.DataFrame() def get_scenario(row): wt = str(row.get("workload_type", "")).lower() return _infer_scenario(wt) anomaly_rows = anomaly_rows.copy() anomaly_rows["scenario"] = anomaly_rows.apply(get_scenario, axis=1) windows = [] for scenario, grp in anomaly_rows.groupby("scenario"): grp = grp.sort_values("timestamp") t_start = grp["timestamp"].iloc[0] t_prev = t_start for ts in grp["timestamp"].iloc[1:]: if ts - t_prev > GAP: windows.append( { "run": run_key, "scenario": scenario, "t_inj": t_start, "t_stop": t_prev, "duration_s": (t_prev - t_start).total_seconds(), } ) t_start = ts t_prev = ts windows.append( { "run": run_key, "scenario": scenario, "t_inj": t_start, "t_stop": t_prev, "duration_s": (t_prev - t_start).total_seconds(), } ) return pd.DataFrame(windows) def extract_baseline_windows(run_key: str) -> pd.DataFrame: """Baseline = scenario_mode == 'training' AND is_anomaly == 0.""" df = load_gt_corrected(run_key) df = df.sort_values("timestamp") baseline = df[(df["scenario_mode"] == "training") & (df["is_anomaly"] == 0)] if baseline.empty: return pd.DataFrame() return pd.DataFrame( [ { "run": run_key, "t_start": baseline["timestamp"].iloc[0], "t_end": baseline["timestamp"].iloc[-1], "duration_s": ( baseline["timestamp"].iloc[-1] - baseline["timestamp"].iloc[0] ).total_seconds(), } ] ) def extract_phase_windows(run_key: str) -> pd.DataFrame: """Extrahiert Workload-Phasen (BW, IOPS, BATCH) aus dem GT.""" df = load_gt_corrected(run_key) df = df.sort_values("timestamp").reset_index(drop=True) starts = df[df["action"].str.startswith("START_PHASE_", na=False)].copy() ends = df[df["action"].str.startswith("END_PHASE_", na=False)].copy() windows = [] for _, start_row in starts.iterrows(): action = start_row["action"] phase_type = action.replace("START_PHASE_", "") t_start = start_row["timestamp"] end_action = f"END_PHASE_{phase_type}" candidates = ends[ (ends["action"] == end_action) & (ends["timestamp"] > t_start) ] if not candidates.empty: t_end = candidates.iloc[0]["timestamp"] else: next_events = df[df["timestamp"] > t_start] t_end = ( next_events["timestamp"].iloc[0] if not next_events.empty else df["timestamp"].iloc[-1] ) windows.append( { "run": run_key, "phase": phase_type, "t_start": t_start, "t_end": t_end, "duration_s": (t_end - t_start).total_seconds(), } ) return pd.DataFrame(windows) def main(): print_section("01 – Ground-Truth-Aufbereitung") print( f" Zeitversatz-Korrektur: GT-Zeitstempel + {GT_OFFSET} (Host UTC+1 → VM UTC, Subtraktion 1h)\n" ) all_windows, all_baselines = [], [] for run_key in RUNS: try: w = extract_injection_windows(run_key) b = extract_baseline_windows(run_key) all_windows.append(w) all_baselines.append(b) base_min = b["duration_s"].sum() / 60 if not b.empty else 0 print( f" {run_key}: {len(w)} Injektionsfenster, Baseline {base_min:.1f} min" ) except FileNotFoundError as e: print(f" WARNUNG – {run_key}: {e}") windows_df = pd.concat(all_windows, ignore_index=True) baselines_df = pd.concat(all_baselines, ignore_index=True) windows_df.to_csv(OUTPUT_DIR / "injection_windows.csv", index=False) baselines_df.to_csv(OUTPUT_DIR / "baseline_windows.csv", index=False) all_phases = [] for run_key in RUNS: try: p = extract_phase_windows(run_key) if not p.empty: all_phases.append(p) except Exception as e: print(f" WARNUNG Phasen {run_key}: {e}") if all_phases: phases_df = pd.concat(all_phases, ignore_index=True) phases_df.to_csv(OUTPUT_DIR / "phase_windows.csv", index=False) print(f" → {len(phases_df)} Phasen-Fenster extrahiert") print_section("Injektionsfenster pro Szenario") summary = ( windows_df.groupby("scenario") .agg( n_windows=("run", "count"), dur_mean_s=("duration_s", "mean"), dur_std_s=("duration_s", "std"), dur_total_min=("duration_s", lambda x: x.sum() / 60), ) .round(1) ) print(summary.to_string()) print_section("Sanity-Check Fensterlängen") for _, row in summary.iterrows(): if row["dur_mean_s"] < 5: print( f" WARNUNG: {row.name} – mittlere Dauer {row['dur_mean_s']:.1f}s " f"sehr kurz (START/STOP-Paare korrekt?)" ) else: print( f" OK: {row.name} – {row['dur_mean_s']:.0f}s mittlere Dauer, " f"{row['n_windows']} Fenster" ) with open(OUTPUT_DIR / "01_gt_summary.txt", "w") as f: f.write(f"GT-Zeitversatz-Korrektur: +{GT_OFFSET}\n") f.write(summary.to_string()) f.write("\n\nAlle Injektionsfenster:\n") f.write(windows_df.to_string()) print(f"\n→ Gespeichert: injection_windows.csv, baseline_windows.csv") if __name__ == "__main__": main()