bachelor-thesis/evaluation/01_ground_truth.py

294 lines
9.3 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
01_ground_truth.py Ground-Truth-Aufbereitung und Injektionsfenster
=====================================================================
Liest die GT-CSVs des Scenario-Generators und extrahiert daraus
Injektionsfenster [t_inj, t_stop] pro Szenario und Lauf.
Ausgabe:
- output/injection_windows.csv
- output/baseline_windows.csv
- output/01_gt_summary.txt
"""
import sys
sys.path.insert(0, ".")
from config import *
GT_OFFSET = pd.Timedelta("-1h")
def load_gt_corrected(run_key: str) -> pd.DataFrame:
"""Lädt GT-CSV und korrigiert den 1h-Zeitversatz."""
gt_file, _ = RUNS[run_key]
df = pd.read_csv(ROOT / gt_file, parse_dates=["timestamp"])
df["timestamp"] = pd.to_datetime(df["timestamp"]).dt.tz_localize(None) + GT_OFFSET
df["timestamp"] = df["timestamp"].dt.tz_localize("UTC")
df["run"] = run_key
return df
def extract_injection_windows(run_key: str) -> pd.DataFrame:
"""
Extrahiert Injektionsfenster aus START_ANOMALY / STOP_ANOMALY Paaren.
Strategie:
1. Alle START_ANOMALY-Zeilen als t_inj
2. Nächste STOP_ANOMALY derselben workload_type als t_stop
3. Szenario-ID aus workload_type ableiten
"""
df = load_gt_corrected(run_key)
df = df.sort_values("timestamp").reset_index(drop=True)
starts = df[df["action"] == "START_ANOMALY"].copy()
stops = df[df["action"] == "STOP_ANOMALY"].copy()
if starts.empty:
print(
f" INFO {run_key}: keine START_ANOMALY-Zeilen, "
f"nutze is_anomaly-Blöcke als Fallback"
)
return _extract_from_anomaly_blocks(df, run_key)
windows = []
for _, start_row in starts.iterrows():
wt = str(start_row.get("workload_type", "")).lower()
t_inj = start_row["timestamp"]
candidates = stops[
(stops["workload_type"] == start_row["workload_type"])
& (stops["timestamp"] > t_inj)
]
if candidates.empty:
next_starts = starts[starts["timestamp"] > t_inj]
t_stop = (
next_starts["timestamp"].iloc[0]
if not next_starts.empty
else df["timestamp"].iloc[-1]
)
else:
t_stop = candidates["timestamp"].iloc[0]
scenario = _infer_scenario(wt)
duration = (t_stop - t_inj).total_seconds()
windows.append(
{
"run": run_key,
"scenario": scenario,
"t_inj": t_inj,
"t_stop": t_stop,
"duration_s": duration,
}
)
return pd.DataFrame(windows)
def _infer_scenario(workload_type: str) -> str:
"""Leitet Szenario-ID aus workload_type ab."""
wt = workload_type.lower().replace("_", "-")
for sid in SCENARIO_IDS:
if sid in wt or sid.replace("-", "_") in wt:
return sid
mapping = {
"slow": "slow-connection",
"latency": "high-latency",
"loss": "packet-loss",
"cong": "congestion",
"outage": "partial-outage",
"flap": "flapping",
"cpu": "cpu-stress",
"io": "io-stress",
"mem": "mem-stress",
}
for key, sid in mapping.items():
if key in wt:
return sid
return wt if wt else "unknown"
def _extract_from_anomaly_blocks(df: pd.DataFrame, run_key: str) -> pd.DataFrame:
"""Fallback: zusammenhängende is_anomaly==1-Blöcke als Fenster."""
GAP = pd.Timedelta("90s")
anomaly_rows = df[df["is_anomaly"] == 1].sort_values("timestamp")
if anomaly_rows.empty:
return pd.DataFrame()
def get_scenario(row):
wt = str(row.get("workload_type", "")).lower()
return _infer_scenario(wt)
anomaly_rows = anomaly_rows.copy()
anomaly_rows["scenario"] = anomaly_rows.apply(get_scenario, axis=1)
windows = []
for scenario, grp in anomaly_rows.groupby("scenario"):
grp = grp.sort_values("timestamp")
t_start = grp["timestamp"].iloc[0]
t_prev = t_start
for ts in grp["timestamp"].iloc[1:]:
if ts - t_prev > GAP:
windows.append(
{
"run": run_key,
"scenario": scenario,
"t_inj": t_start,
"t_stop": t_prev,
"duration_s": (t_prev - t_start).total_seconds(),
}
)
t_start = ts
t_prev = ts
windows.append(
{
"run": run_key,
"scenario": scenario,
"t_inj": t_start,
"t_stop": t_prev,
"duration_s": (t_prev - t_start).total_seconds(),
}
)
return pd.DataFrame(windows)
def extract_baseline_windows(run_key: str) -> pd.DataFrame:
"""Baseline = scenario_mode == 'training' AND is_anomaly == 0."""
df = load_gt_corrected(run_key)
df = df.sort_values("timestamp")
baseline = df[(df["scenario_mode"] == "training") & (df["is_anomaly"] == 0)]
if baseline.empty:
return pd.DataFrame()
return pd.DataFrame(
[
{
"run": run_key,
"t_start": baseline["timestamp"].iloc[0],
"t_end": baseline["timestamp"].iloc[-1],
"duration_s": (
baseline["timestamp"].iloc[-1] - baseline["timestamp"].iloc[0]
).total_seconds(),
}
]
)
def extract_phase_windows(run_key: str) -> pd.DataFrame:
"""Extrahiert Workload-Phasen (BW, IOPS, BATCH) aus dem GT."""
df = load_gt_corrected(run_key)
df = df.sort_values("timestamp").reset_index(drop=True)
starts = df[df["action"].str.startswith("START_PHASE_", na=False)].copy()
ends = df[df["action"].str.startswith("END_PHASE_", na=False)].copy()
windows = []
for _, start_row in starts.iterrows():
action = start_row["action"]
phase_type = action.replace("START_PHASE_", "")
t_start = start_row["timestamp"]
end_action = f"END_PHASE_{phase_type}"
candidates = ends[
(ends["action"] == end_action) & (ends["timestamp"] > t_start)
]
if not candidates.empty:
t_end = candidates.iloc[0]["timestamp"]
else:
next_events = df[df["timestamp"] > t_start]
t_end = (
next_events["timestamp"].iloc[0]
if not next_events.empty
else df["timestamp"].iloc[-1]
)
windows.append(
{
"run": run_key,
"phase": phase_type,
"t_start": t_start,
"t_end": t_end,
"duration_s": (t_end - t_start).total_seconds(),
}
)
return pd.DataFrame(windows)
def main():
print_section("01 Ground-Truth-Aufbereitung")
print(
f" Zeitversatz-Korrektur: GT-Zeitstempel + {GT_OFFSET} (Host UTC+1 → VM UTC, Subtraktion 1h)\n"
)
all_windows, all_baselines = [], []
for run_key in RUNS:
try:
w = extract_injection_windows(run_key)
b = extract_baseline_windows(run_key)
all_windows.append(w)
all_baselines.append(b)
base_min = b["duration_s"].sum() / 60 if not b.empty else 0
print(
f" {run_key}: {len(w)} Injektionsfenster, Baseline {base_min:.1f} min"
)
except FileNotFoundError as e:
print(f" WARNUNG {run_key}: {e}")
windows_df = pd.concat(all_windows, ignore_index=True)
baselines_df = pd.concat(all_baselines, ignore_index=True)
windows_df.to_csv(OUTPUT_DIR / "injection_windows.csv", index=False)
baselines_df.to_csv(OUTPUT_DIR / "baseline_windows.csv", index=False)
all_phases = []
for run_key in RUNS:
try:
p = extract_phase_windows(run_key)
if not p.empty:
all_phases.append(p)
except Exception as e:
print(f" WARNUNG Phasen {run_key}: {e}")
if all_phases:
phases_df = pd.concat(all_phases, ignore_index=True)
phases_df.to_csv(OUTPUT_DIR / "phase_windows.csv", index=False)
print(f"{len(phases_df)} Phasen-Fenster extrahiert")
print_section("Injektionsfenster pro Szenario")
summary = (
windows_df.groupby("scenario")
.agg(
n_windows=("run", "count"),
dur_mean_s=("duration_s", "mean"),
dur_std_s=("duration_s", "std"),
dur_total_min=("duration_s", lambda x: x.sum() / 60),
)
.round(1)
)
print(summary.to_string())
print_section("Sanity-Check Fensterlängen")
for _, row in summary.iterrows():
if row["dur_mean_s"] < 5:
print(
f" WARNUNG: {row.name} mittlere Dauer {row['dur_mean_s']:.1f}s "
f"sehr kurz (START/STOP-Paare korrekt?)"
)
else:
print(
f" OK: {row.name} {row['dur_mean_s']:.0f}s mittlere Dauer, "
f"{row['n_windows']} Fenster"
)
with open(OUTPUT_DIR / "01_gt_summary.txt", "w") as f:
f.write(f"GT-Zeitversatz-Korrektur: +{GT_OFFSET}\n")
f.write(summary.to_string())
f.write("\n\nAlle Injektionsfenster:\n")
f.write(windows_df.to_string())
print(f"\n→ Gespeichert: injection_windows.csv, baseline_windows.csv")
if __name__ == "__main__":
main()