294 lines
9.3 KiB
Python
294 lines
9.3 KiB
Python
"""
|
||
01_ground_truth.py – Ground-Truth-Aufbereitung und Injektionsfenster
|
||
=====================================================================
|
||
Liest die GT-CSVs des Scenario-Generators und extrahiert daraus
|
||
Injektionsfenster [t_inj, t_stop] pro Szenario und Lauf.
|
||
|
||
Ausgabe:
|
||
- output/injection_windows.csv
|
||
- output/baseline_windows.csv
|
||
- output/01_gt_summary.txt
|
||
"""
|
||
|
||
import sys
|
||
|
||
sys.path.insert(0, ".")
|
||
from config import *
|
||
|
||
GT_OFFSET = pd.Timedelta("-1h")
|
||
|
||
|
||
def load_gt_corrected(run_key: str) -> pd.DataFrame:
|
||
"""Lädt GT-CSV und korrigiert den 1h-Zeitversatz."""
|
||
gt_file, _ = RUNS[run_key]
|
||
df = pd.read_csv(ROOT / gt_file, parse_dates=["timestamp"])
|
||
df["timestamp"] = pd.to_datetime(df["timestamp"]).dt.tz_localize(None) + GT_OFFSET
|
||
df["timestamp"] = df["timestamp"].dt.tz_localize("UTC")
|
||
df["run"] = run_key
|
||
return df
|
||
|
||
|
||
def extract_injection_windows(run_key: str) -> pd.DataFrame:
|
||
"""
|
||
Extrahiert Injektionsfenster aus START_ANOMALY / STOP_ANOMALY Paaren.
|
||
|
||
Strategie:
|
||
1. Alle START_ANOMALY-Zeilen als t_inj
|
||
2. Nächste STOP_ANOMALY derselben workload_type als t_stop
|
||
3. Szenario-ID aus workload_type ableiten
|
||
"""
|
||
df = load_gt_corrected(run_key)
|
||
df = df.sort_values("timestamp").reset_index(drop=True)
|
||
|
||
starts = df[df["action"] == "START_ANOMALY"].copy()
|
||
stops = df[df["action"] == "STOP_ANOMALY"].copy()
|
||
|
||
if starts.empty:
|
||
print(
|
||
f" INFO {run_key}: keine START_ANOMALY-Zeilen, "
|
||
f"nutze is_anomaly-Blöcke als Fallback"
|
||
)
|
||
return _extract_from_anomaly_blocks(df, run_key)
|
||
|
||
windows = []
|
||
for _, start_row in starts.iterrows():
|
||
wt = str(start_row.get("workload_type", "")).lower()
|
||
t_inj = start_row["timestamp"]
|
||
|
||
candidates = stops[
|
||
(stops["workload_type"] == start_row["workload_type"])
|
||
& (stops["timestamp"] > t_inj)
|
||
]
|
||
if candidates.empty:
|
||
next_starts = starts[starts["timestamp"] > t_inj]
|
||
t_stop = (
|
||
next_starts["timestamp"].iloc[0]
|
||
if not next_starts.empty
|
||
else df["timestamp"].iloc[-1]
|
||
)
|
||
else:
|
||
t_stop = candidates["timestamp"].iloc[0]
|
||
|
||
scenario = _infer_scenario(wt)
|
||
duration = (t_stop - t_inj).total_seconds()
|
||
|
||
windows.append(
|
||
{
|
||
"run": run_key,
|
||
"scenario": scenario,
|
||
"t_inj": t_inj,
|
||
"t_stop": t_stop,
|
||
"duration_s": duration,
|
||
}
|
||
)
|
||
|
||
return pd.DataFrame(windows)
|
||
|
||
|
||
def _infer_scenario(workload_type: str) -> str:
|
||
"""Leitet Szenario-ID aus workload_type ab."""
|
||
wt = workload_type.lower().replace("_", "-")
|
||
for sid in SCENARIO_IDS:
|
||
if sid in wt or sid.replace("-", "_") in wt:
|
||
return sid
|
||
mapping = {
|
||
"slow": "slow-connection",
|
||
"latency": "high-latency",
|
||
"loss": "packet-loss",
|
||
"cong": "congestion",
|
||
"outage": "partial-outage",
|
||
"flap": "flapping",
|
||
"cpu": "cpu-stress",
|
||
"io": "io-stress",
|
||
"mem": "mem-stress",
|
||
}
|
||
for key, sid in mapping.items():
|
||
if key in wt:
|
||
return sid
|
||
return wt if wt else "unknown"
|
||
|
||
|
||
def _extract_from_anomaly_blocks(df: pd.DataFrame, run_key: str) -> pd.DataFrame:
|
||
"""Fallback: zusammenhängende is_anomaly==1-Blöcke als Fenster."""
|
||
GAP = pd.Timedelta("90s")
|
||
anomaly_rows = df[df["is_anomaly"] == 1].sort_values("timestamp")
|
||
if anomaly_rows.empty:
|
||
return pd.DataFrame()
|
||
|
||
def get_scenario(row):
|
||
wt = str(row.get("workload_type", "")).lower()
|
||
return _infer_scenario(wt)
|
||
|
||
anomaly_rows = anomaly_rows.copy()
|
||
anomaly_rows["scenario"] = anomaly_rows.apply(get_scenario, axis=1)
|
||
|
||
windows = []
|
||
for scenario, grp in anomaly_rows.groupby("scenario"):
|
||
grp = grp.sort_values("timestamp")
|
||
t_start = grp["timestamp"].iloc[0]
|
||
t_prev = t_start
|
||
for ts in grp["timestamp"].iloc[1:]:
|
||
if ts - t_prev > GAP:
|
||
windows.append(
|
||
{
|
||
"run": run_key,
|
||
"scenario": scenario,
|
||
"t_inj": t_start,
|
||
"t_stop": t_prev,
|
||
"duration_s": (t_prev - t_start).total_seconds(),
|
||
}
|
||
)
|
||
t_start = ts
|
||
t_prev = ts
|
||
windows.append(
|
||
{
|
||
"run": run_key,
|
||
"scenario": scenario,
|
||
"t_inj": t_start,
|
||
"t_stop": t_prev,
|
||
"duration_s": (t_prev - t_start).total_seconds(),
|
||
}
|
||
)
|
||
return pd.DataFrame(windows)
|
||
|
||
|
||
def extract_baseline_windows(run_key: str) -> pd.DataFrame:
|
||
"""Baseline = scenario_mode == 'training' AND is_anomaly == 0."""
|
||
df = load_gt_corrected(run_key)
|
||
df = df.sort_values("timestamp")
|
||
baseline = df[(df["scenario_mode"] == "training") & (df["is_anomaly"] == 0)]
|
||
if baseline.empty:
|
||
return pd.DataFrame()
|
||
return pd.DataFrame(
|
||
[
|
||
{
|
||
"run": run_key,
|
||
"t_start": baseline["timestamp"].iloc[0],
|
||
"t_end": baseline["timestamp"].iloc[-1],
|
||
"duration_s": (
|
||
baseline["timestamp"].iloc[-1] - baseline["timestamp"].iloc[0]
|
||
).total_seconds(),
|
||
}
|
||
]
|
||
)
|
||
|
||
|
||
def extract_phase_windows(run_key: str) -> pd.DataFrame:
|
||
"""Extrahiert Workload-Phasen (BW, IOPS, BATCH) aus dem GT."""
|
||
df = load_gt_corrected(run_key)
|
||
df = df.sort_values("timestamp").reset_index(drop=True)
|
||
|
||
starts = df[df["action"].str.startswith("START_PHASE_", na=False)].copy()
|
||
ends = df[df["action"].str.startswith("END_PHASE_", na=False)].copy()
|
||
|
||
windows = []
|
||
for _, start_row in starts.iterrows():
|
||
action = start_row["action"]
|
||
phase_type = action.replace("START_PHASE_", "")
|
||
t_start = start_row["timestamp"]
|
||
|
||
end_action = f"END_PHASE_{phase_type}"
|
||
candidates = ends[
|
||
(ends["action"] == end_action) & (ends["timestamp"] > t_start)
|
||
]
|
||
|
||
if not candidates.empty:
|
||
t_end = candidates.iloc[0]["timestamp"]
|
||
else:
|
||
next_events = df[df["timestamp"] > t_start]
|
||
t_end = (
|
||
next_events["timestamp"].iloc[0]
|
||
if not next_events.empty
|
||
else df["timestamp"].iloc[-1]
|
||
)
|
||
|
||
windows.append(
|
||
{
|
||
"run": run_key,
|
||
"phase": phase_type,
|
||
"t_start": t_start,
|
||
"t_end": t_end,
|
||
"duration_s": (t_end - t_start).total_seconds(),
|
||
}
|
||
)
|
||
|
||
return pd.DataFrame(windows)
|
||
|
||
|
||
def main():
|
||
print_section("01 – Ground-Truth-Aufbereitung")
|
||
print(
|
||
f" Zeitversatz-Korrektur: GT-Zeitstempel + {GT_OFFSET} (Host UTC+1 → VM UTC, Subtraktion 1h)\n"
|
||
)
|
||
|
||
all_windows, all_baselines = [], []
|
||
|
||
for run_key in RUNS:
|
||
try:
|
||
w = extract_injection_windows(run_key)
|
||
b = extract_baseline_windows(run_key)
|
||
all_windows.append(w)
|
||
all_baselines.append(b)
|
||
base_min = b["duration_s"].sum() / 60 if not b.empty else 0
|
||
print(
|
||
f" {run_key}: {len(w)} Injektionsfenster, Baseline {base_min:.1f} min"
|
||
)
|
||
except FileNotFoundError as e:
|
||
print(f" WARNUNG – {run_key}: {e}")
|
||
|
||
windows_df = pd.concat(all_windows, ignore_index=True)
|
||
baselines_df = pd.concat(all_baselines, ignore_index=True)
|
||
|
||
windows_df.to_csv(OUTPUT_DIR / "injection_windows.csv", index=False)
|
||
baselines_df.to_csv(OUTPUT_DIR / "baseline_windows.csv", index=False)
|
||
|
||
all_phases = []
|
||
for run_key in RUNS:
|
||
try:
|
||
p = extract_phase_windows(run_key)
|
||
if not p.empty:
|
||
all_phases.append(p)
|
||
except Exception as e:
|
||
print(f" WARNUNG Phasen {run_key}: {e}")
|
||
if all_phases:
|
||
phases_df = pd.concat(all_phases, ignore_index=True)
|
||
phases_df.to_csv(OUTPUT_DIR / "phase_windows.csv", index=False)
|
||
print(f" → {len(phases_df)} Phasen-Fenster extrahiert")
|
||
|
||
print_section("Injektionsfenster pro Szenario")
|
||
summary = (
|
||
windows_df.groupby("scenario")
|
||
.agg(
|
||
n_windows=("run", "count"),
|
||
dur_mean_s=("duration_s", "mean"),
|
||
dur_std_s=("duration_s", "std"),
|
||
dur_total_min=("duration_s", lambda x: x.sum() / 60),
|
||
)
|
||
.round(1)
|
||
)
|
||
print(summary.to_string())
|
||
|
||
print_section("Sanity-Check Fensterlängen")
|
||
for _, row in summary.iterrows():
|
||
if row["dur_mean_s"] < 5:
|
||
print(
|
||
f" WARNUNG: {row.name} – mittlere Dauer {row['dur_mean_s']:.1f}s "
|
||
f"sehr kurz (START/STOP-Paare korrekt?)"
|
||
)
|
||
else:
|
||
print(
|
||
f" OK: {row.name} – {row['dur_mean_s']:.0f}s mittlere Dauer, "
|
||
f"{row['n_windows']} Fenster"
|
||
)
|
||
|
||
with open(OUTPUT_DIR / "01_gt_summary.txt", "w") as f:
|
||
f.write(f"GT-Zeitversatz-Korrektur: +{GT_OFFSET}\n")
|
||
f.write(summary.to_string())
|
||
f.write("\n\nAlle Injektionsfenster:\n")
|
||
f.write(windows_df.to_string())
|
||
|
||
print(f"\n→ Gespeichert: injection_windows.csv, baseline_windows.csv")
|
||
|
||
|
||
if __name__ == "__main__":
|
||
main()
|