""" 04_detection_behavior.py – Detektionsverhalten & Latenz (REQ-NF06, TF3, TF4) ========================================================== GT-Quelle: injection_windows.csv (aus 01_ground_truth.py) – Fenster aus START_ANOMALY/STOP_ANOMALY-Paaren Matching-Strategie (Overlap): Ein Feature-Fenster [ws, ws+W] gilt als GT-anomal wenn: ws < t_stop AND ws + W > t_inj Ausgabe: - output/04_detection_latency_table.csv - output/04_detection_guete_table.csv - output/04_detection_latency_boxplot.pdf/png - output/04_sead_weights_{run_key}.pdf/png - output/04_detection_stats.txt """ import re import sys sys.path.insert(0, ".") from config import * DETECTORS = ["MAD", "RRCF-fast", "RRCF-mid", "RRCF-slow", "COPOD"] WINDOW_S = 30 def _to_utc(ts): ts = pd.Timestamp(ts) return ts.tz_localize("UTC") if ts.tzinfo is None else ts.tz_convert("UTC") # ── Features mit GT-Labels ──────────────────────────────────────────────────── def build_labeled_features(run_key: str, gt_intervals: pd.DataFrame) -> pd.DataFrame: """ Lädt features aus DuckDB und verknüpft GT-Labels via Overlap-Matching. Pipeline-Alarme kommen aus anomalies.jsonl. """ try: feats = load_duckdb_table(run_key, "features") except Exception as e: print(f" WARNUNG features: {e}") return pd.DataFrame() feats["window_start"] = feats["window_start"].apply(_to_utc) feats = feats.sort_values("window_start").reset_index(drop=True) window_td = pd.Timedelta(seconds=WINDOW_S) feats["gt_label"] = 0 feats["gt_scenario"] = "" for _, iv in gt_intervals.iterrows(): t_inj = _to_utc(iv["t_inj"]) t_stop = _to_utc(iv["t_stop"]) mask = (feats["window_start"] < t_stop) & ( feats["window_start"] + window_td > t_inj ) feats.loc[mask, "gt_label"] = 1 feats.loc[mask, "gt_scenario"] = iv["scenario"] try: anom = load_anomalies(run_key) anom = anom.sort_values("timestamp") feats = feats.sort_values("window_start") anom["timestamp"] = anom["timestamp"].dt.as_unit("ns") feats["window_start"] = feats["window_start"].dt.as_unit("ns") merged = pd.merge_asof( anom[["timestamp", "is_anomaly", "score"]].rename( columns={"timestamp": "anom_ts"} ), feats[["window_start"]], left_on="anom_ts", right_on="window_start", direction="backward", tolerance=pd.Timedelta("35s"), ) agg = merged.groupby("window_start", as_index=False).agg( pipeline_is_anomaly=("is_anomaly", "max"), pipeline_score=("score", "max"), ) agg["pipeline_is_anomaly"] = ( agg["pipeline_is_anomaly"].fillna(False).astype(bool) ) feats = feats.merge( agg[["window_start", "pipeline_is_anomaly", "pipeline_score"]], on="window_start", how="left", ) feats["pipeline_is_anomaly"] = feats["pipeline_is_anomaly"].fillna(False) feats["pipeline_score"] = feats["pipeline_score"].fillna(np.nan) except Exception as e: print(f" WARNUNG anomalies.jsonl: {e}") feats["pipeline_is_anomaly"] = False feats["pipeline_score"] = np.nan feats["run"] = run_key return feats[ [ "window_start", "gt_label", "gt_scenario", "pipeline_is_anomaly", "pipeline_score", "run", ] ] # ── Detektionslatenz ────────────────────────────────────────────────────────── def compute_detection_latency( run_key: str, gt_intervals: pd.DataFrame, labeled: pd.DataFrame ) -> pd.DataFrame: """Δt = window_start des ersten TP - t_inj des Intervalls.""" rows = [] window_td = pd.Timedelta(seconds=WINDOW_S) for _, iv in gt_intervals.iterrows(): t_inj = _to_utc(iv["t_inj"]) t_stop = _to_utc(iv["t_stop"]) tp_windows = labeled[ (labeled["pipeline_is_anomaly"] == True) & (labeled["window_start"] < t_stop) & (labeled["window_start"] + window_td > t_inj) ] if not tp_windows.empty: t_det = tp_windows["window_start"].min() t_det_effective = t_det + pd.Timedelta(seconds=WINDOW_S) delta = (t_det_effective - t_inj).total_seconds() detected = True else: t_det = pd.NaT delta = np.nan detected = False rows.append( { "run": run_key, "scenario": iv["scenario"], "t_inj": t_inj, "t_stop": t_stop, "t_det": t_det, "delta_t_s": delta, "detected": detected, } ) return pd.DataFrame(rows) # ── Precision / Recall / F1 ─────────────────────────────────────────────────── def compute_prf(labeled: pd.DataFrame, run_key: str) -> dict: tp = int((labeled["pipeline_is_anomaly"] & (labeled["gt_label"] == 1)).sum()) fp = int((labeled["pipeline_is_anomaly"] & (labeled["gt_label"] == 0)).sum()) fn = int((~labeled["pipeline_is_anomaly"] & (labeled["gt_label"] == 1)).sum()) tn = int((~labeled["pipeline_is_anomaly"] & (labeled["gt_label"] == 0)).sum()) precision = tp / (tp + fp) if (tp + fp) > 0 else 0.0 recall = tp / (tp + fn) if (tp + fn) > 0 else 0.0 f1 = ( 2 * precision * recall / (precision + recall) if (precision + recall) > 0 else 0.0 ) per_scenario = {} for scen in labeled[labeled["gt_label"] == 1]["gt_scenario"].unique(): sub = labeled[labeled["gt_scenario"] == scen] s_tp = int((sub["pipeline_is_anomaly"] & (sub["gt_label"] == 1)).sum()) s_fn = int((~sub["pipeline_is_anomaly"] & (sub["gt_label"] == 1)).sum()) s_fp = int((sub["pipeline_is_anomaly"] & (sub["gt_label"] == 0)).sum()) s_p = s_tp / (s_tp + s_fp) if (s_tp + s_fp) > 0 else 0.0 s_r = s_tp / (s_tp + s_fn) if (s_tp + s_fn) > 0 else 0.0 s_f = 2 * s_p * s_r / (s_p + s_r) if (s_p + s_r) > 0 else 0.0 per_scenario[scen] = {"precision": s_p, "recall": s_r, "f1": s_f} return { "run": run_key, "tp": tp, "fp": fp, "fn": fn, "tn": tn, "precision": precision, "recall": recall, "f1": f1, "per_scenario": per_scenario, } # ── SEAD-Gewichte ───────────────────────────────────────────────────────────── def plot_sead_weights(run_key: str, gt_intervals: pd.DataFrame): try: anom = load_anomalies(run_key) except Exception: return weight_cols = [f"{d}_weight" for d in DETECTORS] if any(c not in anom.columns for c in weight_cols): return fig, ax = plt.subplots(figsize=(14, 4)) colors = plt.cm.tab10.colors for i, (det, col) in enumerate(zip(DETECTORS, weight_cols)): ax.plot( anom["timestamp"], anom[col], label=det, color=colors[i], lw=0.9, alpha=0.85 ) for _, row in gt_intervals.iterrows(): t0, t1 = _to_utc(row["t_inj"]), _to_utc(row["t_stop"]) ax.axvspan(t0, t1, alpha=0.08, color=PALETTE["anomaly"]) ylim = ax.get_ylim() ax.text( t0, ylim[1] if ylim[1] > 0 else 0.4, row["scenario"], rotation=90, fontsize=6, va="top", color="firebrick", alpha=0.6, ) ax.axhline(0.2, color="gray", linestyle=":", lw=0.8, label="Gleichgewicht") ax.set_ylim(0, 0.6) ax.xaxis.set_major_formatter(mdates.DateFormatter("%H:%M")) ax.set_xlabel("Zeit (UTC)") ax.set_ylabel("Ensemble-Gewicht $w_i$") ax.set_title(f"SEAD-Ensemble-Gewichte – {run_key}") ax.legend(loc="upper right", ncol=3, fontsize=8) save_fig(f"04_sead_weights_{run_key}") # ── Plots & Tabellen ────────────────────────────────────────────────────────── def plot_latency_boxplot(latency_df: pd.DataFrame): valid = latency_df.dropna(subset=["delta_t_s"]) if valid.empty: return fig, ax = plt.subplots(figsize=(13, 5)) order = sorted(valid["scenario"].unique()) sns.boxplot( data=valid, x="scenario", y="delta_t_s", order=order, palette="Blues_d", ax=ax, flierprops={"markersize": 3}, hue="scenario", legend=False, ) ax.axhline( WINDOW_S, color="orange", linestyle="--", lw=1.0, label=f"Min. Latenz = 1 Fenster ({WINDOW_S}s)", ) ax.axhline( 2 * WINDOW_S, color="red", linestyle=":", lw=0.8, label=f"2 Fenster ({2 * WINDOW_S}s)", ) ax.set_xlabel("Szenario") ax.set_ylabel("Detektionslatenz Δt (s)") ax.set_title("Detektionslatenz pro Szenario (alle Läufe und Profile)") ax.tick_params(axis="x", rotation=35) ax.legend(fontsize=8) save_fig("04_detection_latency_boxplot") def build_latency_table(lat: pd.DataFrame) -> pd.DataFrame: rows = [] for scen in SCENARIO_IDS: sub = lat[lat["scenario"] == scen] valid = sub.dropna(subset=["delta_t_s"]) rows.append( { "Szenario": scen, "n_detected": int(sub["detected"].sum()) if not sub.empty else 0, "n_total": len(sub) if not sub.empty else 0, "Recall_%": f"{sub['detected'].mean() * 100:.0f}" if not sub.empty else "0", "Median_s": f"{valid['delta_t_s'].median():.1f}" if not valid.empty else "—", "IQR_s": f"{valid['delta_t_s'].quantile(0.75) - valid['delta_t_s'].quantile(0.25):.1f}" if len(valid) >= 4 else "—", "Min_s": f"{valid['delta_t_s'].min():.1f}" if not valid.empty else "—", "Max_s": f"{valid['delta_t_s'].max():.1f}" if not valid.empty else "—", } ) return pd.DataFrame(rows) def build_guete_table(all_metrics: list) -> pd.DataFrame: scenario_agg = {scen: {"p": [], "r": [], "f": []} for scen in SCENARIO_IDS} g_p, g_r, g_f = [], [], [] for m in all_metrics: if not m: continue g_p.append(m["precision"]) g_r.append(m["recall"]) g_f.append(m["f1"]) for s, v in m["per_scenario"].items(): if s in scenario_agg: scenario_agg[s]["p"].append(v["precision"]) scenario_agg[s]["r"].append(v["recall"]) scenario_agg[s]["f"].append(v["f1"]) rows = [] for s in SCENARIO_IDS: v = scenario_agg[s] p_mean = np.mean(v["p"]) if v["p"] else 0.0 r_mean = np.mean(v["r"]) if v["r"] else 0.0 f_mean = np.mean(v["f"]) if v["f"] else 0.0 rows.append( { "Szenario": s, "Precision": f"{p_mean:.3f} ± {np.std(v['p']) if v['p'] else 0:.3f}", "Recall": f"{r_mean:.3f} ± {np.std(v['r']) if v['r'] else 0:.3f}", "F1-Score": f"{f_mean:.3f} ± {np.std(v['f']) if v['f'] else 0:.3f}", } ) if g_f: rows.append( { "Szenario": "Makro-Mittel", "Precision": f"{np.mean(g_p):.3f}", "Recall": f"{np.mean(g_r):.3f}", "F1-Score": f"{np.mean(g_f):.3f}", } ) return pd.DataFrame(rows) # ── main ────────────────────────────────────────────────────────────────────── def main(): print_section("04 – Detektionsverhalten (REQ-NF06, TF3, TF4)") print(f" GT-Quelle: injection_windows.csv (Zeitversatz bereits korrigiert)") print(f" Matching: Overlap ws < t_stop AND ws+{WINDOW_S}s > t_inj") windows_df = pd.read_csv( OUTPUT_DIR / "injection_windows.csv", parse_dates=["t_inj", "t_stop"] ) all_latency, all_metrics = [], [] for run_key in RUNS: print(f"\n {run_key} ...") gt = windows_df[windows_df["run"] == run_key].copy() if gt.empty: print(f" WARNUNG: keine GT-Fenster") continue labeled = build_labeled_features(run_key, gt) if labeled.empty: continue n_gt = int(labeled["gt_label"].sum()) n_alm = int(labeled["pipeline_is_anomaly"].sum()) print( f" Fenster: {len(labeled)} gesamt, " f"{n_gt} GT-positiv, {n_alm} Pipeline-Alarme" ) lat = compute_detection_latency(run_key, gt, labeled) dr = lat["detected"].mean() * 100 if not lat.empty else 0 ml = lat["delta_t_s"].median() print( f" Detektionsrate={dr:.0f}% " + (f"Median-Latenz={ml:.1f}s" if not np.isnan(ml) else "keine Detektionen") ) all_latency.append(lat) m = compute_prf(labeled, run_key) all_metrics.append(m) print( f" P={m['precision']:.3f} R={m['recall']:.3f} F1={m['f1']:.3f} " f"TP={m['tp']} FP={m['fp']} FN={m['fn']}" ) plot_sead_weights(run_key, gt) latency_df = pd.concat([l for l in all_latency if not l.empty], ignore_index=True) print_section("Detektionslatenz pro Szenario") lat_tbl = build_latency_table(latency_df) print(lat_tbl.to_string(index=False)) lat_tbl.to_csv(OUTPUT_DIR / "04_detection_latency_table.csv", index=False) print_section("Detektionsgüte (Precision / Recall / F1)") guete_tbl = build_guete_table(all_metrics) print(guete_tbl.to_string(index=False)) guete_tbl.to_csv(OUTPUT_DIR / "04_detection_guete_table.csv", index=False) plot_latency_boxplot(latency_df) all_tp = sum(m["tp"] for m in all_metrics if m) all_fp = sum(m["fp"] for m in all_metrics if m) all_fn = sum(m["fn"] for m in all_metrics if m) gp = all_tp / (all_tp + all_fp) if (all_tp + all_fp) > 0 else 0 gr = all_tp / (all_tp + all_fn) if (all_tp + all_fn) > 0 else 0 gf = 2 * gp * gr / (gp + gr) if (gp + gr) > 0 else 0 print_section("Globale Summen (alle 12 Läufe)") print(f" TP={all_tp} FP={all_fp} FN={all_fn}") print(f" Precision={gp:.3f} Recall={gr:.3f} F1={gf:.3f}") with open(OUTPUT_DIR / "04_detection_stats.txt", "w") as f: f.write(f"Detektionsverhalten – Overlap-Matching (W={WINDOW_S}s)\n") f.write("=" * 65 + "\n") f.write("Latenz:\n" + lat_tbl.to_string(index=False)) f.write("\n\nGüte:\n" + guete_tbl.to_string(index=False)) f.write( f"\n\nGlobal: TP={all_tp} FP={all_fp} FN={all_fn} " f"P={gp:.3f} R={gr:.3f} F1={gf:.3f}\n" ) print(f"\n→ Gespeichert: 04_detection_*.csv, 04_sead_weights_*.pdf/png") if __name__ == "__main__": main()