package main import ( "context" "fmt" "log" "sync" "time" ) type ScenarioType string const ( TypeNetwork ScenarioType = "network" TypeResource ScenarioType = "resource" ) type InjectionMode string const ( InjectionContinuous InjectionMode = "continuous" InjectionPulsed InjectionMode = "pulsed" ) type Command struct { Host string Command string } type ScenarioDef struct { ID string Description string ChaosHypothesis string RealWorldAnalog string Type ScenarioType InjectionMode InjectionMode PulseDuration time.Duration PauseDuration time.Duration StartCmds []Command StopCmds []Command IsFlapping bool EnableRecoveryRestart bool RecoveryRestartHosts []string } const tixstreamServiceUnit = "tixstream" const tjmServiceUnit = "transfer-job-manager" func tixstreamRestart() string { return fmt.Sprintf("sudo systemctl restart %s", tixstreamServiceUnit) } func tjmRestart() string { return fmt.Sprintf("sudo systemctl restart %s", tjmServiceUnit) } type ScenarioManager struct { Inventory *Inventory SSHManager map[string]*SSHClient ActiveScenario *ScenarioDef mu sync.Mutex logFn func(action, source, target, workloadType string, isAnomaly bool) cancelFlap context.CancelFunc cancelPulse context.CancelFunc } func NewScenarioManager(inv *Inventory, logFn func(string, string, string, string, bool)) (*ScenarioManager, error) { sm := &ScenarioManager{ Inventory: inv, SSHManager: make(map[string]*SSHClient), logFn: logFn, } client, err := NewSSHClient(inv.Router) if err != nil { return nil, fmt.Errorf("failed to connect to router: %v", err) } sm.SSHManager["router"] = client for name, config := range inv.Workers { client, err := NewSSHClient(config) if err != nil { log.Printf("Warning: failed to connect to worker %s: %v", name, err) continue } sm.SSHManager[name] = client } return sm, nil } func (sm *ScenarioManager) GetScenarios() []ScenarioDef { return []ScenarioDef{ // ── NETWORK SCENARIOS ────────────────────────────────────────────────── { ID: "slow-connection", Description: "Niedrige Bandbreite + hohe Latenz (Satellit / WAN-Link)", ChaosHypothesis: "Wir nehmen an, dass tixstream bei 50 Mbit/s und 300–400 ms RTT " + "weiterhin Transfers abschließt, jedoch mit messbar reduziertem Durchsatz " + "und erhöhter Transfer-Dauer.", RealWorldAnalog: "WAN-Verbindung zwischen zwei DTN-Sites über Satellit oder " + "stark ausgelasteten Backbone-Link (typisch in wissenschaftlichen Campusnetzen).", Type: TypeNetwork, InjectionMode: InjectionContinuous, StartCmds: []Command{ {"router", "sudo tc qdisc add dev ens4 root tbf rate 30mbit burst 128kbit latency 500ms"}, {"router", "sudo tc qdisc add dev ens5 root tbf rate 30mbit burst 128kbit latency 500ms"}, {"router", "sudo tc qdisc add dev ens6 root handle 1: netem delay 300ms"}, {"router", "sudo tc qdisc add dev ens6 parent 1:1 handle 10: tbf rate 30mbit burst 128kbit latency 500ms"}, }, StopCmds: []Command{ {"router", "sudo tc qdisc del dev ens4 root || true"}, {"router", "sudo tc qdisc del dev ens5 root || true"}, {"router", "sudo tc qdisc del dev ens6 root || true"}, }, }, { ID: "high-latency", Description: "Hohe Latenz (150–250 ms RTT)", ChaosHypothesis: "Wir nehmen an, dass tixstream bei 150–250 ms RTT (mit ±10–20 ms Jitter) " + "stabile Verbindungen aufrecht erhält, jedoch Protokoll-Timeouts und " + "Retransmissions sichtbar werden.", RealWorldAnalog: "Intercontinental DTN-Transfers (z.B. DE ↔ US), " + "oder Routing über congested Peering-Points.", Type: TypeNetwork, InjectionMode: InjectionContinuous, StartCmds: []Command{ {"router", "sudo tc qdisc add dev ens4 root netem delay 250ms 20ms distribution normal"}, {"router", "sudo tc qdisc add dev ens5 root netem delay 250ms 20ms distribution normal"}, {"router", "sudo tc qdisc add dev ens6 root netem delay 350ms 40ms distribution normal"}, }, StopCmds: []Command{ {"router", "sudo tc qdisc del dev ens4 root || true"}, {"router", "sudo tc qdisc del dev ens5 root || true"}, {"router", "sudo tc qdisc del dev ens6 root || true"}, }, }, { ID: "packet-loss", Description: "Paketverlust (1–3%)", ChaosHypothesis: "Wir nehmen an, dass tixstream bei 3% Paketverlust auf dem Datenpfad " + "Retransmissions durchführt und Transfers abschließt, ohne in einen " + "Fehler-Zustand zu geraten, bei 1% marginal messbare Auswirkungen zeigt.", RealWorldAnalog: "Fehlerhafte Transceiver, überlastete Switch-Ports, oder " + "Wireless-Backhaul mit schlechter Signalqualität im ScienceDMZ.", Type: TypeNetwork, InjectionMode: InjectionContinuous, StartCmds: []Command{ {"router", "sudo tc qdisc add dev ens4 root netem loss 5% 30%"}, {"router", "sudo tc qdisc add dev ens5 root netem loss 5% 30%"}, {"router", "sudo tc qdisc add dev ens6 root netem loss 3% 30%"}, }, StopCmds: []Command{ {"router", "sudo tc qdisc del dev ens4 root || true"}, {"router", "sudo tc qdisc del dev ens5 root || true"}, {"router", "sudo tc qdisc del dev ens6 root || true"}, }, }, { ID: "congestion", Description: "Netzüberlastung (Delay + Loss + Rate Limit kombiniert)", ChaosHypothesis: "Wir nehmen an, dass tixstream unter kombinierter Überlastung " + "(50 ms Delay, 1% Loss, 80 Mbit/s Rate Limit) Transfers verlangsamt, " + "jedoch keine Verbindungsabbrüche erzeugt.", RealWorldAnalog: "Shared-Use-Link im ScienceDMZ unter gleichzeitigem Bulk-Transfer " + "mehrerer Nutzer; typisch bei BigData-Experimenten ohne Traffic-Shaping.", Type: TypeNetwork, InjectionMode: InjectionContinuous, StartCmds: []Command{ {"router", "sudo tc qdisc add dev ens4 root handle 1: netem delay 50ms 10ms loss 3% 25%"}, {"router", "sudo tc qdisc add dev ens4 parent 1:1 handle 10: tbf rate 40mbit burst 128kbit latency 100ms"}, {"router", "sudo tc qdisc add dev ens5 root handle 1: netem delay 50ms 10ms loss 3% 25%"}, {"router", "sudo tc qdisc add dev ens5 parent 1:1 handle 10: tbf rate 40mbit burst 128kbit latency 100ms"}, }, StopCmds: []Command{ {"router", "sudo tc qdisc del dev ens4 root || true"}, {"router", "sudo tc qdisc del dev ens5 root || true"}, }, }, { ID: "partial-outage", Description: "Partielle Netzunterbrechung (W0 ↔ W1 Pfad gesperrt)", ChaosHypothesis: "Wir nehmen an, dass tixstream bei vollständiger Unterbrechung des " + "W0↔W1-Pfades Transfers auf diesem Pfad als fehlgeschlagen markiert, " + "den W0↔W2-Pfad jedoch nicht beeinflusst.", RealWorldAnalog: "Ausfall eines Switch-Ports oder einer Firewall-Regel im ScienceDMZ, " + "die einen spezifischen DTN-zu-DTN-Pfad blockiert.", Type: TypeNetwork, InjectionMode: InjectionContinuous, StartCmds: []Command{ {"router", "sudo iptables -A FORWARD -i ens4 -o ens5 -j DROP"}, {"router", "sudo iptables -A FORWARD -i ens5 -o ens4 -j DROP"}, {"router", "sudo tc qdisc add dev ens6 root netem delay 10ms"}, }, StopCmds: []Command{ {"router", "sudo iptables -D FORWARD -i ens4 -o ens5 -j DROP || true"}, {"router", "sudo iptables -D FORWARD -i ens5 -o ens4 -j DROP || true"}, {"router", "sudo tc qdisc del dev ens6 root || true"}, }, }, { ID: "flapping", Description: "Verbindungsinstabilität / Wackelkontakt (30s Periode)", ChaosHypothesis: "Wir nehmen an, dass tixstream bei periodisch unterbrochenen " + "Netzverbindungen (100% Loss für 30s, dann normal für 30s) " + "Reconnect-Mechanismen aktiviert und Transfers nach Wiederherstellung " + "der Verbindung fortsetzt.", RealWorldAnalog: "Instabile optische Verbindung, failing SFP-Transceiver, oder " + "BGP-Route-Flapping zwischen zwei DTN-Sites.", Type: TypeNetwork, InjectionMode: InjectionContinuous, // Flapping has its own goroutine IsFlapping: true, StartCmds: []Command{ {"router", "sudo tc qdisc add dev ens4 root netem loss 100%"}, {"router", "sudo tc qdisc add dev ens5 root netem loss 100%"}, }, StopCmds: []Command{ {"router", "sudo tc qdisc del dev ens4 root || true"}, {"router", "sudo tc qdisc del dev ens5 root || true"}, }, }, { ID: "cpu-stress", Description: "CPU-Sättigung auf W0 (pulsed, 4 Kerne, moderate Last)", ChaosHypothesis: "Wir nehmen an, dass tixstream bei temporärer CPU-Sättigung " + "(4 Kerne für 45s) messbar erhöhte Transfer-Latenz zeigt, sich aber " + "nach Ende der Lastphase vollständig erholt.", RealWorldAnalog: "Konkurrierende wissenschaftliche Rechenaufgabe auf einem DTN " + "(z.B. On-the-fly Kompression, Checksummen-Validierung großer Datasets), " + "wie sie im ScienceDMZ-Kontext bei fehlender CPU-Reservierung auftreten.", Type: TypeResource, InjectionMode: InjectionPulsed, PulseDuration: 45 * time.Second, PauseDuration: 55 * time.Second, StartCmds: []Command{ {"thesis-worker-0", "nohup stress-ng --cpu 4 --cpu-method matrix --timeout 50s > /dev/null 2>&1 &"}, }, StopCmds: []Command{ {"thesis-worker-0", "pkill -f stress-ng || true"}, }, EnableRecoveryRestart: false, }, { ID: "io-stress", Description: "Disk I/O Bottleneck auf W0 (pulsed, gemäßigt)", ChaosHypothesis: "Wir nehmen an, dass tixstream bei kurzzeitiger Disk-I/O-Sättigung " + "(30s) Schreib-Timeouts zeigt und transferierte Dateien mit Verzögerung " + "schreibt, sich nach Ende der I/O-Last jedoch ohne Neustart erholt.", RealWorldAnalog: "Paralleler Zugriff auf denselben Storage-Backend durch " + "Archivierungs- oder Indexierungsprozesse auf einem DTN; typisch bei " + "POSIX-Shared-Storage ohne I/O-Scheduling.", Type: TypeResource, InjectionMode: InjectionPulsed, PulseDuration: 35 * time.Second, PauseDuration: 60 * time.Second, StartCmds: []Command{ {"thesis-worker-0", "nohup stress-ng --io 3 --hdd 1 --hdd-bytes 512m --timeout 40s > /dev/null 2>&1 &"}, }, StopCmds: []Command{ {"thesis-worker-0", "pkill -f stress-ng || true"}, }, EnableRecoveryRestart: true, RecoveryRestartHosts: []string{"thesis-worker-0"}, }, { ID: "mem-stress", Description: "Memory Pressure auf W0 (pulsed, 2.0 GB)", ChaosHypothesis: "Wir nehmen an, dass tixstream unter moderatem Speicherdruck (2.0 GB) " + "messbar erhöhte Latenz durch erhöhte Page-Fault-Rate zeigt, " + "jedoch nicht vom OOM-Killer beendet wird.", RealWorldAnalog: "In-Memory-Verarbeitung (z.B. Metadaten-Extraktion, " + "Stream-Processing) eines konkurrierenden Prozesses auf einem DTN " + "mit begrenztem RAM.", Type: TypeResource, InjectionMode: InjectionPulsed, PulseDuration: 30 * time.Second, PauseDuration: 60 * time.Second, StartCmds: []Command{ {"thesis-worker-0", "nohup stress-ng --vm 1 --vm-bytes 2000m --vm-keep --timeout 35s > /dev/null 2>&1 &"}, }, StopCmds: []Command{ {"thesis-worker-0", "pkill -f stress-ng || true"}, }, EnableRecoveryRestart: true, RecoveryRestartHosts: []string{"thesis-worker-0"}, }, } } func (sm *ScenarioManager) StartScenario(id string) error { sm.mu.Lock() defer sm.mu.Unlock() if sm.ActiveScenario != nil { sm.stopActiveScenario() } scenarios := sm.GetScenarios() var target *ScenarioDef for _, s := range scenarios { if s.ID == id { target = &s break } } if target == nil { return fmt.Errorf("scenario %s not found", id) } switch { case target.IsFlapping: ctx, cancel := context.WithCancel(context.Background()) sm.cancelFlap = cancel go sm.runFlapping(ctx, target) case target.InjectionMode == InjectionPulsed: ctx, cancel := context.WithCancel(context.Background()) sm.cancelPulse = cancel go sm.runPulsed(ctx, target) default: for _, cmd := range target.StartCmds { client, ok := sm.SSHManager[cmd.Host] if !ok { log.Printf("Host %s not found in SSH manager", cmd.Host) continue } if _, err := client.RunCommand(cmd.Command); err != nil { log.Printf("Error running start command on %s: %v", cmd.Host, err) } } } sm.ActiveScenario = target sm.logFn("START_ANOMALY", "Orchestrator", "System", target.ID, true) return nil } func (sm *ScenarioManager) StopScenario() { sm.mu.Lock() defer sm.mu.Unlock() sm.stopActiveScenario() } func (sm *ScenarioManager) stopActiveScenario() { if sm.ActiveScenario == nil { return } if sm.cancelFlap != nil { sm.cancelFlap() sm.cancelFlap = nil } if sm.cancelPulse != nil { sm.cancelPulse() sm.cancelPulse = nil } for _, cmd := range sm.ActiveScenario.StopCmds { client, ok := sm.SSHManager[cmd.Host] if !ok { continue } if _, err := client.RunCommand(cmd.Command); err != nil { log.Printf("Error running stop command on %s: %v", cmd.Host, err) } } sm.logFn("STOP_ANOMALY", "Orchestrator", "System", sm.ActiveScenario.ID, true) if sm.ActiveScenario.EnableRecoveryRestart { log.Printf("Recovery restart triggered for scenario %s", sm.ActiveScenario.ID) sm.logFn("RECOVERY_RESTART", "Orchestrator", "System", sm.ActiveScenario.ID, false) time.Sleep(3 * time.Second) for _, host := range sm.ActiveScenario.RecoveryRestartHosts { client, ok := sm.SSHManager[host] if !ok { log.Printf("Recovery restart: host %s not found", host) continue } out, err := client.RunCommand(tixstreamRestart()) if err != nil { log.Printf("Recovery restart failed on %s: %v (output: %s)", host, err, out) } else { log.Printf("Recovery restart successful on %s", host) } } time.Sleep(10 * time.Second) } sm.ActiveScenario = nil } func (sm *ScenarioManager) runPulsed(ctx context.Context, s *ScenarioDef) { log.Printf("Pulsed injection started for scenario %s (pulse=%s pause=%s)", s.ID, s.PulseDuration, s.PauseDuration) runCmds := func(cmds []Command) { for _, cmd := range cmds { client, ok := sm.SSHManager[cmd.Host] if !ok { continue } if _, err := client.RunCommand(cmd.Command); err != nil { log.Printf("Pulsed cmd error on %s: %v", cmd.Host, err) } } } for { sm.logFn("PULSE_ON", "Orchestrator", "System", s.ID, true) runCmds(s.StartCmds) select { case <-ctx.Done(): runCmds(s.StopCmds) return case <-time.After(s.PulseDuration): } sm.logFn("PULSE_OFF", "Orchestrator", "System", s.ID, true) runCmds(s.StopCmds) select { case <-ctx.Done(): return case <-time.After(s.PauseDuration): } } } func (sm *ScenarioManager) runFlapping(ctx context.Context, s *ScenarioDef) { ticker := time.NewTicker(30 * time.Second) defer ticker.Stop() active := false for { select { case <-ctx.Done(): return case <-ticker.C: active = !active cmds := s.StopCmds action := "FLAP_OFF" if active { cmds = s.StartCmds action = "FLAP_ON" } sm.logFn(action, "Orchestrator", "System", s.ID, true) for _, cmd := range cmds { client, ok := sm.SSHManager[cmd.Host] if !ok { continue } client.RunCommand(cmd.Command) } } } }