bachelor-thesis/evaluation/03_resource_overhead.py

570 lines
19 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
03_resource_overhead.py Ressourceneffizienz (REQ-NF01, NF04, NF05)
==============================================================================
Differenz-basierte Overhead-Berechnung:
Ebene 0: system_baseline
OS + MFT-Software idle, keine Pipeline, kein Transfer
→ Absoluter Nullpunkt
Ebene 1: pipeline_idle_baseline system_baseline
= PIPELINE-OVERHEAD (isoliert)
→ Beantwortet REQ-NF1 und REQ-NF2 direkt
Ebene 2: workload_baseline_phase pipeline_idle_baseline
= TRANSFER-OVERHEAD (MFT-Engine + TIXstream unter Last)
→ Trennt Pipeline-Overhead vom Transfer-Overhead
Ebene 3: workload_injection_phase workload_baseline_phase
= CHAOS-IMPACT auf Ressourcen
Alle RAM- und CPU-Werte werden als DIFFERENZEN zum jeweiligen Nullpunkt
dargestellt, nicht als absolute Werte.
Ausgabe:
- output/03_resource_table.csv
- output/03_resource_overhead_stacked.pdf/png (gestapelte Differenzen)
- output/03_cpu_overhead_timeseries_{run}.pdf/png
- output/03_resource_stats.txt
"""
import sys
sys.path.insert(0, ".")
from config import *
SYSTEM_BASELINE_DIR = ROOT / "pipeline_system_baseline"
IDLE_BASELINE_DIR = ROOT / "pipeline_idle_pipeline_baseline"
def _to_utc(ts):
ts = pd.Timestamp(ts)
return ts.tz_localize("UTC") if ts.tzinfo is None else ts.tz_convert("UTC")
def load_extra_baseline(directory: Path) -> pd.DataFrame:
path = directory / "baseline_metrics.csv"
df = pd.read_csv(path)
df["timestamp"] = pd.to_datetime(df["timestamp"], unit="s", utc=True)
return df
GT_OFFSET = pd.Timedelta("-1h")
def load_workload_metrics(run_key: str) -> pd.DataFrame:
_, pipeline_dir = RUNS[run_key]
path = ROOT / pipeline_dir / "baseline_metrics.csv"
df = pd.read_csv(path)
df["timestamp"] = pd.to_datetime(df["timestamp"], unit="s", utc=True)
df["run"] = run_key
return df
def get_phase_boundaries(run_key: str) -> dict:
"""
Liest EXPERIMENT_BASELINE_END und EXPERIMENT_CHAOS_START/END
aus der GT-CSV und gibt UTC-Zeitstempel zurück.
Rückgabe:
baseline_end: Ende der Baseline-Phase (UTC)
chaos_intervals: Liste von (start_utc, end_utc, scenario) Tupeln
"""
gt_file, _ = RUNS[run_key]
gt = pd.read_csv(ROOT / gt_file, parse_dates=["timestamp"])
gt["timestamp"] = (
pd.to_datetime(gt["timestamp"]).dt.tz_localize(None) + GT_OFFSET
).dt.tz_localize("UTC")
bl_end_rows = gt[gt["action"] == "EXPERIMENT_BASELINE_END"]
if bl_end_rows.empty:
bl_end_rows = gt[gt["action"].isin(["START_ANOMALY", "EXPERIMENT_CHAOS_START"])]
baseline_end = bl_end_rows["timestamp"].iloc[0] if not bl_end_rows.empty else None
chaos_starts = gt[gt["action"] == "EXPERIMENT_CHAOS_START"].copy()
chaos_ends = gt[gt["action"] == "EXPERIMENT_CHAOS_END"].copy()
chaos_intervals = []
for _, row in chaos_starts.iterrows():
scenario = str(row.get("workload_type", row.get("scenario", "unknown"))).lower()
t_start = row["timestamp"]
matching_ends = chaos_ends[
(chaos_ends["workload_type"] == row.get("workload_type", ""))
& (chaos_ends["timestamp"] > t_start)
]
t_end = (
matching_ends["timestamp"].iloc[0] if not matching_ends.empty else t_start
)
chaos_intervals.append((t_start, t_end, scenario))
return {
"baseline_end": baseline_end,
"chaos_intervals": chaos_intervals,
}
def split_by_phase(
bm: pd.DataFrame,
run_key: str,
windows_df: pd.DataFrame,
baseline_windows: pd.DataFrame,
) -> dict:
"""
Phasentrennung anhand von EXPERIMENT_BASELINE_END aus der GT-CSV.
Alles vor baseline_end → baseline
Zeiträume in chaos_intervals → injection
"""
boundaries = get_phase_boundaries(run_key)
baseline_end = boundaries["baseline_end"]
chaos_intervals = boundaries["chaos_intervals"]
bm = bm.copy()
ts = bm["timestamp"]
if baseline_end is None:
print(f" WARNUNG {run_key}: kein EXPERIMENT_BASELINE_END gefunden")
return {"baseline": bm, "injection": pd.DataFrame()}
baseline_safe_end = baseline_end - pd.Timedelta("30s")
in_baseline = ts < baseline_safe_end
in_injection = pd.Series(False, index=bm.index)
for t_start, t_end, scenario in chaos_intervals:
in_injection |= (ts >= t_start) & (ts <= t_end)
in_injection &= ~in_baseline
n_bl = int(in_baseline.sum())
n_inj = int(in_injection.sum())
bm_min = ts.min().strftime("%H:%M")
bm_max = ts.max().strftime("%H:%M")
bl_end_h = baseline_end.strftime("%H:%M")
print(
f" {run_key}: bm={bm_min}-{bm_max} UTC, "
f"baseline_end={bl_end_h} UTC → baseline={n_bl}, injection={n_inj}"
)
return {
"baseline": bm[in_baseline],
"injection": bm[in_injection],
}
def compute_reference_stats(df: pd.DataFrame) -> dict:
"""Berechnet Median und P95 für CPU und RAM."""
return {
"cpu_median": float(df["cpu_percent"].median()),
"cpu_p95": float(df["cpu_percent"].quantile(0.95)),
"cpu_arr": df["cpu_percent"].values,
"ram_median": float(df["memory_used_mb"].median()),
"ram_p95": float(df["memory_used_mb"].quantile(0.95)),
"ram_arr": df["memory_used_mb"].values,
}
def analyze_profile(
profile_name: str,
run_keys: list,
windows_df: pd.DataFrame,
baseline_windows: pd.DataFrame,
sys_ref: dict,
idle_ref: dict,
) -> dict:
"""
Berechnet CPU- und RAM-Differenzen auf allen Ebenen für ein Profil.
"""
cpu_bl_abs, cpu_inj_abs = [], []
ram_bl_abs, ram_inj_abs = [], []
cpu_runs, ram_runs = {}, {}
for rk in run_keys:
try:
bm = load_workload_metrics(rk)
except FileNotFoundError:
continue
phases = split_by_phase(bm, rk, windows_df, baseline_windows)
if not phases["baseline"].empty:
cpu_runs[rk] = phases["baseline"]["cpu_percent"].median()
ram_runs[rk] = phases["baseline"]["memory_used_mb"].median()
cpu_bl_abs.extend(phases["baseline"]["cpu_percent"].tolist())
cpu_inj_abs.extend(phases["injection"]["cpu_percent"].tolist())
ram_bl_abs.extend(phases["baseline"]["memory_used_mb"].tolist())
ram_inj_abs.extend(phases["injection"]["memory_used_mb"].tolist())
if len(cpu_bl_abs) < 5:
return {}
cpu_run_vals = list(cpu_runs.values())
ram_run_vals = list(ram_runs.values())
cpu_stability = f"{np.mean(cpu_run_vals):.2f} ± {np.std(cpu_run_vals):.2f}"
ram_stability = f"{np.mean(ram_run_vals):.1f} ± {np.std(ram_run_vals):.1f}"
cpu_bl = np.array(cpu_bl_abs)
cpu_inj = np.array(cpu_inj_abs)
ram_bl = np.array(ram_bl_abs)
ram_inj = np.array(ram_inj_abs)
cpu_transfer_overhead = cpu_bl - idle_ref["cpu_median"]
ram_transfer_overhead = ram_bl - idle_ref["ram_median"]
cpu_chaos = (
(cpu_inj - float(np.median(cpu_bl))) if len(cpu_inj) > 0 else np.array([])
)
ram_chaos = (
(ram_inj - float(np.median(ram_bl))) if len(ram_inj) > 0 else np.array([])
)
return {
"profile": profile_name,
"n_bl": len(cpu_bl),
"n_inj": len(cpu_inj),
"cpu_stability": cpu_stability,
"ram_stability": ram_stability,
"cpu_bl_abs_median": float(np.median(cpu_bl)),
"cpu_bl_abs_p95": float(np.percentile(cpu_bl, 95)),
"ram_bl_abs_median": float(np.median(ram_bl)),
"ram_bl_abs_p95": float(np.percentile(ram_bl, 95)),
"cpu_transfer_med": float(np.median(cpu_transfer_overhead)),
"cpu_transfer_p95": float(np.percentile(cpu_transfer_overhead, 95)),
"ram_transfer_med": float(np.median(ram_transfer_overhead)),
"ram_transfer_p95": float(np.percentile(ram_transfer_overhead, 95)),
"cpu_chaos_med": float(np.nanmedian(cpu_chaos))
if len(cpu_chaos) > 0
else np.nan,
"cpu_chaos_p95": float(np.nanpercentile(cpu_chaos, 95))
if len(cpu_chaos) > 0
else np.nan,
"ram_chaos_med": float(np.nanmedian(ram_chaos))
if len(ram_chaos) > 0
else np.nan,
"ram_chaos_p95": float(np.nanpercentile(ram_chaos, 95))
if len(ram_chaos) > 0
else np.nan,
# Arrays für Plot
"_cpu_bl_abs": cpu_bl,
"_ram_bl_abs": ram_bl,
"_cpu_transfer": cpu_transfer_overhead,
"_ram_transfer": ram_transfer_overhead,
"_cpu_chaos": cpu_chaos,
}
def plot_stacked_overhead(
sys_ref: dict, idle_ref: dict, pipeline_overhead: dict, results: list
):
"""
Gestapeltes Balkendiagramm das die Overhead-Schichten visualisiert:
Ebene 0: System-Baseline (grau)
Ebene 1: + Pipeline-Overhead (grün)
Ebene 2: + Transfer-Overhead pro Profil (blau)
Ebene 3: + Chaos-Impact (rot, gestrichelt)
"""
fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(13, 6))
profiles = [r["profile"] for r in results if r]
x = np.arange(len(profiles))
width = 0.55
for ax, metric, unit in [
(ax1, "cpu", "%"),
(ax2, "ram", "MB"),
]:
sys_val = sys_ref[f"{metric}_median"]
pipeline_val = pipeline_overhead[f"{metric}_overhead_median"]
sys_arr = np.full(len(x), sys_val)
pipe_arr = np.full(len(x), pipeline_val)
transfer_med = np.array(
[r[f"{metric}_transfer_med"] for r in results if r], dtype=float
)
chaos_med = np.array(
[r[f"{metric}_chaos_med"] for r in results if r], dtype=float
)
chaos_clean = np.where(np.isnan(chaos_med), 0.0, chaos_med)
transfer_plot = np.maximum(transfer_med, 0.0)
ax.bar(
x,
sys_arr,
width,
label=f"System-Baseline ({sys_val:.1f} {unit})",
color="#B0BEC5",
zorder=2,
)
ax.bar(
x,
pipe_arr,
width,
bottom=sys_arr,
label=f"Pipeline-Overhead ({pipeline_val:+.1f} {unit})",
color="#66BB6A",
zorder=2,
)
ax.bar(
x,
transfer_plot,
width,
bottom=sys_arr + pipe_arr,
label="Transfer-Overhead (Median, ≥0)",
color="#42A5F5",
zorder=2,
)
if np.any(chaos_clean > 0):
totals = sys_arr + pipe_arr + transfer_plot
ax.errorbar(
x,
totals,
yerr=[np.zeros(len(x)), chaos_clean],
fmt="none",
color="firebrick",
capsize=4,
lw=1.5,
label="Chaos-Impact (Median)",
)
else:
ax.text(
0.5,
0.97,
"Chaos-Impact: keine Injektionsdaten (Zeitfenster außerhalb Aufzeichnung)",
transform=ax.transAxes,
ha="center",
va="top",
fontsize=7,
color="gray",
bbox=dict(boxstyle="round,pad=0.3", fc="white", alpha=0.7),
)
ax.set_xticks(x)
ax.set_xticklabels(profiles, rotation=15)
ax.set_ylabel(
f"{'CPU-Auslastung' if metric == 'cpu' else 'RAM-Verbrauch'} ({unit})"
)
ax.set_title(f"{'CPU' if metric == 'cpu' else 'RAM'}: Overhead-Schichten")
ax.legend(fontsize=7.5, loc="upper left")
ax.set_ylim(bottom=0)
plt.suptitle(
"Ressourcen-Overhead-Analyse: System → Pipeline → Transfer → Chaos", fontsize=11
)
plt.tight_layout()
save_fig("03_resource_overhead_stacked")
def plot_cpu_overhead_timeseries(
run_key: str, windows_df: pd.DataFrame, sys_ref: dict, idle_ref: dict
):
"""
Zeitreihe des CPU-OVERHEADS (nicht absolute Werte).
Zeigt: CPU_workload sys_baseline_median als Differenz.
Referenzlinien: Pipeline-Overhead (idlesys), REQ-NF1-Grenze.
"""
try:
bm = load_workload_metrics(run_key)
except FileNotFoundError:
return
inj = windows_df[windows_df["run"] == run_key].copy()
inj["t_inj"] = inj["t_inj"].apply(_to_utc)
inj["t_stop"] = inj["t_stop"].apply(_to_utc)
bm["cpu_overhead"] = bm["cpu_percent"] - sys_ref["cpu_median"]
pipeline_overhead_val = idle_ref["cpu_median"] - sys_ref["cpu_median"]
fig, ax = plt.subplots(figsize=(14, 4))
ts_r = bm.set_index("timestamp")["cpu_overhead"].resample("10s").median()
ax.plot(
ts_r.index,
ts_r.values,
color=PALETTE["pipeline"],
lw=0.8,
label="CPU-Overhead (workload sys_baseline)",
)
ax.axhline(
pipeline_overhead_val,
color="#66BB6A",
linestyle="--",
lw=1.2,
label=f"Pipeline-Overhead-Referenz ({pipeline_overhead_val:+.1f}%)",
)
ax.axhline(0, color="gray", linestyle=":", lw=0.7, alpha=0.5)
ylim_top = float(ts_r.quantile(0.99)) * 1.15 if not ts_r.empty else 10
for _, row in inj.iterrows():
ax.axvspan(row["t_inj"], row["t_stop"], alpha=0.10, color=PALETTE["anomaly"])
ax.text(
row["t_inj"],
ylim_top,
row["scenario"],
rotation=90,
fontsize=5,
va="top",
color="firebrick",
alpha=0.6,
)
ax.xaxis.set_major_formatter(mdates.DateFormatter("%H:%M"))
ax.set_xlabel("Zeit (UTC)")
ax.set_ylabel("CPU-Overhead relativ zu System-Baseline (%)")
ax.set_title(f"CPU-Overhead-Zeitreihe {run_key}")
ax.set_ylim(bottom=-5, top=max(15, ylim_top))
ax.legend(loc="upper right", fontsize=8)
save_fig(f"03_cpu_overhead_timeseries_{run_key}")
def print_and_save_table(
sys_ref: dict, idle_ref: dict, pipeline_overhead: dict, results: list
):
print_section("Referenz-Messungen")
print(
f" system_baseline: "
f"CPU={sys_ref['cpu_median']:.2f}% (P95={sys_ref['cpu_p95']:.2f}%), "
f"RAM={sys_ref['ram_median']:.0f} MB (P95={sys_ref['ram_p95']:.0f} MB)"
)
print(
f" pipeline_idle_baseline: "
f"CPU={idle_ref['cpu_median']:.2f}% (P95={idle_ref['cpu_p95']:.2f}%), "
f"RAM={idle_ref['ram_median']:.0f} MB (P95={idle_ref['ram_p95']:.0f} MB)"
)
print_section("Pipeline-Overhead (idle system) → REQ-NF01")
cpu_ov_med = pipeline_overhead["cpu_overhead_median"]
cpu_ov_p95 = pipeline_overhead["cpu_overhead_p95"]
ram_ov_med = pipeline_overhead["ram_overhead_median"]
ram_ov_p95 = pipeline_overhead["ram_overhead_p95"]
print(f" CPU Overhead: Median={cpu_ov_med:+.2f}%, P95={cpu_ov_p95:+.2f}%")
print(f" RAM Overhead: Median={ram_ov_med:+.1f} MB, P95={ram_ov_p95:+.1f} MB")
req_nf01 = cpu_ov_p95 < 10.0
print(
f" REQ-NF01 (Pipeline-CPU-Overhead < 10%): "
f"{'OK' if req_nf01 else 'VERLETZT'} (P95={cpu_ov_p95:.2f}%)"
)
print_section(
"Transfer-Overhead pro Workload-Profil (workload_baseline pipeline_idle)"
)
rows = []
for r in results:
if not r:
continue
rows.append(
{
"Profil": r["profile"],
"CPU Stab. (Med±Std)": r["cpu_stability"],
"RAM Stab. (Med±Std)": r["ram_stability"],
"CPU Transfer Med.(%)": f"{r['cpu_transfer_med']:+.2f}",
"CPU Transfer P95(%)": f"{r['cpu_transfer_p95']:+.2f}",
"RAM Transfer Med.(MB)": f"{r['ram_transfer_med']:+.1f}",
"RAM Transfer P95(MB)": f"{r['ram_transfer_p95']:+.1f}",
}
)
tdf = pd.DataFrame(rows)
print(tdf.to_string(index=False))
tdf.to_csv(OUTPUT_DIR / "03_resource_table.csv", index=False)
with open(OUTPUT_DIR / "03_resource_stats.txt", "w") as f:
f.write("Ressourcenverbrauch-Analyse (differenzbasiert)\n")
f.write("=" * 65 + "\n\n")
f.write(
f"system_baseline: CPU={sys_ref['cpu_median']:.2f}%, "
f"RAM={sys_ref['ram_median']:.0f} MB\n"
)
f.write(
f"pipeline_idle: CPU={idle_ref['cpu_median']:.2f}%, "
f"RAM={idle_ref['ram_median']:.0f} MB\n\n"
)
f.write(f"Pipeline-Overhead (idle system):\n")
f.write(f" CPU: Median={cpu_ov_med:+.2f}%, P95={cpu_ov_p95:+.2f}%\n")
f.write(f" RAM: Median={ram_ov_med:+.1f} MB, P95={ram_ov_p95:+.1f} MB\n")
f.write(f" REQ-NF01 (CPU Overhead): {'OK' if req_nf01 else 'VERLETZT'}\n\n")
f.write("Transfer-Overhead pro Profil:\n")
f.write(tdf.to_string(index=False))
def main():
print_section("03 Ressourcenverbrauch (REQ-NF01, NF04, NF05)")
print(" Methode: Differenz-basierte Overhead-Schichten")
print(" Pipeline-Overhead = pipeline_idle system_baseline")
print(" Transfer-Overhead = workload_baseline pipeline_idle")
print(" Chaos-Impact = workload_injection workload_baseline\n")
windows_df = pd.read_csv(
OUTPUT_DIR / "injection_windows.csv", parse_dates=["t_inj", "t_stop"]
)
baseline_windows = pd.read_csv(
OUTPUT_DIR / "baseline_windows.csv", parse_dates=["t_start", "t_end"]
)
for col in ["t_inj", "t_stop"]:
windows_df[col] = windows_df[col].apply(_to_utc)
for col in ["t_start", "t_end"]:
baseline_windows[col] = baseline_windows[col].apply(_to_utc)
try:
sys_df = load_extra_baseline(SYSTEM_BASELINE_DIR)
idle_df = load_extra_baseline(IDLE_BASELINE_DIR)
except FileNotFoundError as e:
print(f" FEHLER: {e}")
return
sys_ref = compute_reference_stats(sys_df)
idle_ref = compute_reference_stats(idle_df)
pipeline_overhead = {
"cpu_overhead_median": idle_ref["cpu_median"] - sys_ref["cpu_median"],
"cpu_overhead_p95": idle_ref["cpu_p95"] - sys_ref["cpu_median"],
"ram_overhead_median": idle_ref["ram_median"] - sys_ref["ram_median"],
"ram_overhead_p95": idle_ref["ram_p95"] - sys_ref["ram_median"],
}
print(
f" system_baseline: CPU={sys_ref['cpu_median']:.2f}%, "
f"RAM={sys_ref['ram_median']:.0f} MB"
)
print(
f" pipeline_idle: CPU={idle_ref['cpu_median']:.2f}%, "
f"RAM={idle_ref['ram_median']:.0f} MB"
)
print(
f" Pipeline-Overhead CPU: {pipeline_overhead['cpu_overhead_median']:+.2f}% "
f"(P95={pipeline_overhead['cpu_overhead_p95']:+.2f}%)"
)
print(
f" Pipeline-Overhead RAM: {pipeline_overhead['ram_overhead_median']:+.1f} MB "
f"(P95={pipeline_overhead['ram_overhead_p95']:+.1f} MB)\n"
)
results = []
for profile, run_keys in WORKLOAD_PROFILES.items():
print(f" Profil: {profile}")
r = analyze_profile(
profile, run_keys, windows_df, baseline_windows, sys_ref, idle_ref
)
results.append(r)
if r:
print(
f" Transfer-Overhead CPU: {r['cpu_transfer_med']:+.2f}% "
f"(P95={r['cpu_transfer_p95']:+.2f}%)"
)
print(
f" Transfer-Overhead RAM: {r['ram_transfer_med']:+.1f} MB "
f"(P95={r['ram_transfer_p95']:+.1f} MB)"
)
plot_cpu_overhead_timeseries(run_keys[0], windows_df, sys_ref, idle_ref)
print_and_save_table(sys_ref, idle_ref, pipeline_overhead, results)
plot_stacked_overhead(sys_ref, idle_ref, pipeline_overhead, results)
print(
f"\n→ Gespeichert: 03_resource_table.csv, "
f"03_resource_overhead_stacked.pdf/png, "
f"03_cpu_overhead_timeseries_*.pdf/png"
)
if __name__ == "__main__":
main()