package main import ( "bytes" "context" "encoding/csv" "encoding/json" "flag" "fmt" "io" "log" "math/rand" "net/http" "net/url" "os" "os/signal" "strings" "sync" "syscall" "time" ) const ( Port = "60000" APIPort = "60001" AuthHeader = "Basic YWRtaW46dmVyeXNlY3JldA==" LogFile = "./thesis_scenario_ground_truth.csv" IncomingDir = "/local_testdata/test-destination" WorkerPoolSize = 10 MaxRetries = 3 RetryBackoff = 2 * time.Second MFTStatusCompleted = 870 MFTStatusFailed = 900 // How long to wait for a single job before giving up JobCompletionTimeout = 5 * time.Minute ) type ScenarioMode string const ( ModeTraining ScenarioMode = "training" ModeTesting ScenarioMode = "testing" ModeAnomalyOnly ScenarioMode = "anomaly-only" ) // ExperimentConfig holds the full experiment timing configuration. // Used by the built-in experiment runner (--run-experiment flag). type ExperimentConfig struct { BaselineDuration time.Duration ChaosDuration time.Duration CooldownDuration time.Duration Scenarios []string // if empty, all scenarios are used } type Worker struct { Name string Host string ID string } type TransferJob struct { Description string `json:"description"` Schedule string `json:"schedule"` DestinationSystem string `json:"destination_system"` DestinationShare string `json:"destination_share"` SourceFileURIs []string `json:"source_file_uris"` WaitForPublish bool `json:"wait_for_publish"` FlatFileModeDisabled bool `json:"flat_file_mode_disabled"` } type MFTJobResponse struct { ID int `json:"id"` JobID string `json:"job_id"` StatusCode int `json:"status_code"` StatusMessage string `json:"status_message"` StatusDetail string `json:"status_detail"` } type JobRequest struct { SourceWorker Worker DestWorker Worker Schedule string FileURI string FileURIs []string Description string WorkloadType string IsAnomaly bool } type WorkloadFilter string const ( WorkloadAll WorkloadFilter = "all" WorkloadHighBW WorkloadFilter = "high-bw" WorkloadHighIOPS WorkloadFilter = "high-iops" WorkloadInterference WorkloadFilter = "interference" WorkloadBatchOut WorkloadFilter = "batch-out" WorkloadBatchIn WorkloadFilter = "batch-in" WorkloadIdle WorkloadFilter = "idle" ) type Config struct { Mode ScenarioMode MonitoredWorker string EnableBurst bool EnableMaintenance bool AnomalyRate float64 RunExperiment bool Experiment ExperimentConfig WorkloadFilter WorkloadFilter } type ScenarioGenerator struct { workers []Worker config Config logWriter *csv.Writer logFile *os.File logMutex sync.Mutex httpClient *http.Client rand *rand.Rand randMutex sync.Mutex jobQueue chan JobRequest wg sync.WaitGroup sm *ScenarioManager } func NewScenarioGenerator(config Config, inv *Inventory) (*ScenarioGenerator, error) { var workers []Worker for i := range 3 { id := fmt.Sprintf("thesis-worker-%d", i) host := id if inv != nil { if h, ok := inv.Workers[id]; ok { if h.IP != "" { host = h.IP } } } workers = append(workers, Worker{ Name: fmt.Sprintf("Worker Node %d", i), Host: host, ID: id, }) } logFile, err := os.OpenFile(LogFile, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0o644) if err != nil { return nil, fmt.Errorf("kann Log-Datei nicht öffnen: %w", err) } stat, _ := logFile.Stat() writer := csv.NewWriter(logFile) if stat.Size() == 0 { if err := writer.Write([]string{ "timestamp", "action", "source", "target", "workload_type", "is_anomaly", "scenario_mode", }); err != nil { logFile.Close() return nil, fmt.Errorf("kann Header nicht schreiben: %w", err) } writer.Flush() } return &ScenarioGenerator{ workers: workers, config: config, logWriter: writer, logFile: logFile, httpClient: &http.Client{Timeout: 30 * time.Second}, rand: rand.New(rand.NewSource(time.Now().UnixNano())), jobQueue: make(chan JobRequest, 100), sm: nil, }, nil } func (sg *ScenarioGenerator) Close() error { sg.logMutex.Lock() defer sg.logMutex.Unlock() sg.logWriter.Flush() return sg.logFile.Close() } func (sg *ScenarioGenerator) logGroundTruth(action, source, target, workloadType string, isAnomaly bool) { sg.logMutex.Lock() defer sg.logMutex.Unlock() timestamp := time.Now().Format("2006-01-02 15:04:05") anomalyStr := "0" if isAnomaly { anomalyStr = "1" } if err := sg.logWriter.Write([]string{ timestamp, action, source, target, workloadType, anomalyStr, string(sg.config.Mode), }); err != nil { log.Printf("Fehler beim Schreiben in Log: %v", err) return } sg.logWriter.Flush() anomalyMarker := "" if isAnomaly { anomalyMarker = " [ANOMALY]" } log.Printf("[GT] %s | %s: %s -> %s (%s)%s", timestamp, action, source, target, workloadType, anomalyMarker) } func (sg *ScenarioGenerator) cleanupRemote() { monitoredHost := sg.config.MonitoredWorker log.Printf("Bereinige Incoming-Ordner auf %s (via SSH)...", monitoredHost) cmd := fmt.Sprintf("sudo find %s -type f -mmin +10 -delete", IncomingDir) client, ok := sg.sm.SSHManager[monitoredHost] if !ok { log.Printf("Fehler: Kein SSH-Client für %s gefunden", monitoredHost) return } out, err := client.RunCommand(cmd) if err != nil { log.Printf("Remote Cleanup fehlgeschlagen: %v (Output: %s)", err, out) return } log.Println("Remote Cleanup erfolgreich abgeschlossen.") } func (sg *ScenarioGenerator) waitForAPI(ctx context.Context, worker Worker) error { apiURL := fmt.Sprintf("http://%s:%s/transfer-job-manager/v1/jobs", worker.Host, Port) log.Printf("Prüfe Verfügbarkeit von %s (%s)...", worker.Name, worker.Host) ticker := time.NewTicker(10 * time.Second) defer ticker.Stop() for { select { case <-ctx.Done(): return ctx.Err() case <-ticker.C: req, err := http.NewRequestWithContext(ctx, "GET", apiURL, nil) if err != nil { continue } req.Header.Set("Authorization", AuthHeader) resp, err := sg.httpClient.Do(req) if err == nil && resp.StatusCode == 200 { resp.Body.Close() log.Printf("%s ist ONLINE", worker.Name) return nil } if resp != nil { resp.Body.Close() } log.Printf("Warte auf %s... Retry in 10s", worker.Name) } } } func (sg *ScenarioGenerator) triggerJobWithRetry(ctx context.Context, req JobRequest) (string, error) { const maxServiceRecoveryAttempts = 2 for recoveryAttempt := range maxServiceRecoveryAttempts + 1 { jobID, err := sg.tryTriggerJob(ctx, req) if err == nil { return jobID, nil } if recoveryAttempt >= maxServiceRecoveryAttempts { return "", fmt.Errorf("job failed after service recovery: %w", err) } log.Printf("[TRIGGER] Alle %d Versuche fehlgeschlagen, starte Health-Check auf %s...", MaxRetries, req.SourceWorker.Host) sg.checkAndRestartWorkerServices(req.SourceWorker) } return "", fmt.Errorf("job trigger fehlgeschlagen nach Service-Recovery") } // tryTriggerJob fuehrt MaxRetries POST-Versuche durch ohne Service-Recovery. func (sg *ScenarioGenerator) tryTriggerJob(ctx context.Context, req JobRequest) (string, error) { var lastErr error for attempt := range MaxRetries { if attempt > 0 { select { case <-ctx.Done(): return "", ctx.Err() case <-time.After(RetryBackoff * time.Duration(attempt)): } } sourceFiles := req.FileURIs if len(sourceFiles) == 0 && req.FileURI != "" { sourceFiles = []string{req.FileURI} } job := TransferJob{ Description: req.Description, Schedule: "NOW", DestinationSystem: req.DestWorker.Name, DestinationShare: "local_sync_destination", SourceFileURIs: sourceFiles, WaitForPublish: false, FlatFileModeDisabled: false, } jsonData, err := json.Marshal(job) if err != nil { return "", fmt.Errorf("marshal error: %w", err) } apiURL := fmt.Sprintf("http://%s:%s/transfer-job-manager/v1/jobs", req.SourceWorker.Host, Port) httpReq, err := http.NewRequestWithContext(ctx, "POST", apiURL, bytes.NewBuffer(jsonData)) if err != nil { return "", fmt.Errorf("request creation error: %w", err) } httpReq.Header.Set("Authorization", AuthHeader) httpReq.Header.Set("Content-Type", "application/json") resp, err := sg.httpClient.Do(httpReq) if err != nil { lastErr = err log.Printf("Fehler bei Job-Trigger (Versuch %d/%d): %v", attempt+1, MaxRetries, err) continue } if resp.StatusCode >= 200 && resp.StatusCode < 300 { var result MFTJobResponse if err := json.NewDecoder(resp.Body).Decode(&result); err != nil { resp.Body.Close() return "", fmt.Errorf("response decode error: %w", err) } resp.Body.Close() log.Printf("Job submitted: id=%d job_id=%s", result.ID, result.JobID) return result.JobID, nil } io.Copy(io.Discard, resp.Body) resp.Body.Close() lastErr = fmt.Errorf("HTTP %d", resp.StatusCode) log.Printf("Job fehlgeschlagen mit Status %d (Versuch %d/%d)", resp.StatusCode, attempt+1, MaxRetries) } return "", fmt.Errorf("job failed after %d attempts: %w", MaxRetries, lastErr) } func (sg *ScenarioGenerator) fetchJobStatus(ctx context.Context, sourceWorker Worker, jobIDStr string) (*MFTJobResponse, error) { encodedJobID := url.PathEscape(jobIDStr) apiURL := fmt.Sprintf("http://%s:%s/transfer-job-manager/v1/jobs/%s", sourceWorker.Host, Port, encodedJobID) httpReq, err := http.NewRequestWithContext(ctx, "GET", apiURL, nil) if err != nil { return nil, err } httpReq.Header.Set("Authorization", AuthHeader) resp, err := sg.httpClient.Do(httpReq) if err != nil { return nil, err } defer resp.Body.Close() if resp.StatusCode != 200 { return nil, fmt.Errorf("HTTP %d für job %s", resp.StatusCode, jobIDStr) } var result MFTJobResponse if err := json.NewDecoder(resp.Body).Decode(&result); err != nil { return nil, fmt.Errorf("decode error: %w", err) } return &result, nil } const MFTServiceRestartThreshold = 3 const ( serviceTransferJobManager = "transfer-job-manager" serviceTixstream = "tixstream" ) func (sg *ScenarioGenerator) checkAndRestartWorkerServices(worker Worker) bool { log.Printf("[HEALTH] Pruefe Services auf %s via SSH...", worker.Host) hostCfg, ok := sg.sm.Inventory.Workers[worker.ID] if !ok { log.Printf("[HEALTH] Worker %s nicht in Inventory gefunden", worker.ID) return false } client, err := NewSSHClient(hostCfg) if err != nil { log.Printf("[HEALTH] SSH-Verbindung zu %s fehlgeschlagen: %v", worker.Host, err) return false } defer client.Close() restarted := false for _, svc := range []string{serviceTransferJobManager, serviceTixstream} { out, err := client.RunCommand(fmt.Sprintf("systemctl is-active %s", svc)) status := strings.TrimSpace(out) if err != nil || status != "active" { log.Printf("[HEALTH] Service %s auf %s ist '%s' - starte neu...", svc, worker.Host, status) sg.logGroundTruth("MFT_SERVICE_RESTART", worker.Host, svc, "N/A", false) restartOut, restartErr := client.RunCommand(fmt.Sprintf("sudo systemctl restart %s", svc)) if restartErr != nil { log.Printf("[HEALTH] Restart von %s auf %s fehlgeschlagen: %v (output: %s)", svc, worker.Host, restartErr, restartOut) continue } log.Printf("[HEALTH] %s auf %s erfolgreich neugestartet", svc, worker.Host) restarted = true } else { log.Printf("[HEALTH] Service %s auf %s: OK (active)", svc, worker.Host) } } if restarted { log.Printf("[HEALTH] Warte 15s auf Service-Startup auf %s...", worker.Host) time.Sleep(15 * time.Second) } return restarted } func (sg *ScenarioGenerator) waitForJobCompletion(ctx context.Context, sourceWorker Worker, jobIDStr string) error { ticker := time.NewTicker(5 * time.Second) defer ticker.Stop() timeout := time.After(JobCompletionTimeout) log.Printf("Warte auf Job-Abschluss: %s", jobIDStr) consecutiveErrors := 0 for { select { case <-ctx.Done(): return ctx.Err() case <-timeout: return fmt.Errorf("job %s timed out nach %s", jobIDStr, JobCompletionTimeout) case <-ticker.C: result, err := sg.fetchJobStatus(ctx, sourceWorker, jobIDStr) if err != nil { consecutiveErrors++ log.Printf("Polling-Fehler fuer Job %s (%d/%d): %v", jobIDStr, consecutiveErrors, MFTServiceRestartThreshold, err) if consecutiveErrors >= MFTServiceRestartThreshold { sg.checkAndRestartWorkerServices(sourceWorker) consecutiveErrors = 0 } continue } consecutiveErrors = 0 log.Printf("Job %s: %s – %s (%d)", jobIDStr, result.StatusMessage, result.StatusDetail, result.StatusCode) switch { case result.StatusCode == MFTStatusCompleted: log.Printf("Job abgeschlossen: %s", jobIDStr) return nil case result.StatusCode == 880: log.Printf("Job %s: temporaerer Fehler (880), warte auf Auto-Retry...", jobIDStr) case result.StatusCode >= MFTStatusFailed: return fmt.Errorf("job %s fehlgeschlagen (terminal): %s – %s", jobIDStr, result.StatusMessage, result.StatusDetail) } } } } func (sg *ScenarioGenerator) triggerAndWait(ctx context.Context, req JobRequest) error { jobIDStr, err := sg.triggerJobWithRetry(ctx, req) if err != nil { return err } return sg.waitForJobCompletion(ctx, req.SourceWorker, jobIDStr) } func (sg *ScenarioGenerator) triggerJobOverSSHWithRetry(ctx context.Context, req JobRequest) error { var lastErr error for attempt := range MaxRetries { if attempt > 0 { select { case <-ctx.Done(): return ctx.Err() case <-time.After(RetryBackoff * time.Duration(1<= 0 { log.Printf("WorkloadFilter aktiv: nur Phase '%s' (phase=%d) wird wiederholt", sg.config.WorkloadFilter, fixedPhase) } phase := initialPhase for { select { case <-ctx.Done(): log.Println("Shutdown-Signal empfangen") return ctx.Err() default: } if sg.config.EnableMaintenance && sg.config.Mode != ModeTraining && phase == 0 && sg.intn(100) < 5 { sg.runMaintenanceWorkload(ctx) } switch phase { case 0: log.Println("PHASE 1: High Bandwidth (W0 -> W1)") sg.logGroundTruth("START_PHASE_BW", W0.Host, W1.Host, "HIGH_BW", false) if err := sg.triggerAndWait(ctx, JobRequest{ SourceWorker: W0, DestWorker: W1, FileURI: "local_sync_source/5g.mxf", Description: "Thesis Phase 1: High BW", WorkloadType: "HIGH_BW", }); err != nil { log.Printf("Phase 1 fehlgeschlagen: %v", err) } sg.logGroundTruth("END_PHASE_BW", W0.Host, W1.Host, "HIGH_BW", false) if fixedPhase >= 0 { phase = 5 } else { phase = 1 } case 1: log.Println("PHASE 2: High IOPS (W0 -> W2)") sg.logGroundTruth("START_PHASE_IOPS", W0.Host, W2.Host, "HIGH_IOPS", false) var iopsWg sync.WaitGroup for i := 1; i <= 20; i++ { select { case <-ctx.Done(): return ctx.Err() default: } iopsWg.Add(1) go func(fileIdx int) { defer iopsWg.Done() if err := sg.triggerAndWait(ctx, JobRequest{ SourceWorker: W0, DestWorker: W2, FileURI: fmt.Sprintf("local_sync_source/small_%d.dat", (fileIdx%10)+1), Description: fmt.Sprintf("Thesis Phase 2: File %d", fileIdx), WorkloadType: "HIGH_IOPS", }); err != nil { log.Printf("IOPS Job %d fehlgeschlagen: %v", fileIdx, err) } }(i) time.Sleep(200 * time.Millisecond) } iopsWg.Wait() sg.logGroundTruth("END_PHASE_IOPS", W0.Host, W2.Host, "HIGH_IOPS", false) if fixedPhase >= 0 { phase = 5 } else { phase = 2 } case 2: log.Println("PHASE 3: Interferenz (W1 -> W0)") sg.logGroundTruth("START_PHASE_INTERFERENCE", W1.Host, W0.Host, "INTERFERENCE", false) if err := sg.triggerAndWait(ctx, JobRequest{ SourceWorker: W1, DestWorker: W0, FileURI: "local_sync_source/1g.mxf", Description: "Thesis Phase 3: Interference", WorkloadType: "INTERFERENCE", }); err != nil { log.Printf("Phase 3 fehlgeschlagen: %v", err) } sg.cleanupRemote() sg.logGroundTruth("END_PHASE_INTERFERENCE", W1.Host, W0.Host, "INTERFERENCE", false) if fixedPhase >= 0 { phase = 5 } else { phase = 3 } case 3: log.Println("PHASE 4: Batch Transfer OUT (W0 -> W2)") sg.logGroundTruth("START_PHASE_BATCH_OUT", W0.Host, W2.Host, "BATCH_IOPS_OUT", false) if err := sg.triggerAndWait(ctx, JobRequest{ SourceWorker: W0, DestWorker: W2, FileURIs: localSyncSeries("small_", 1, 20), Description: "Thesis Phase 4: Bulk Transfer Out", WorkloadType: "BATCH_IOPS_OUT", }); err != nil { log.Printf("Batch Out fehlgeschlagen: %v", err) } sg.logGroundTruth("END_PHASE_BATCH_OUT", W0.Host, W2.Host, "BATCH_IOPS_OUT", false) if fixedPhase >= 0 { phase = 5 } else { phase = 4 } case 4: log.Println("PHASE 5: Batch Transfer IN (W1 -> W0)") sg.logGroundTruth("START_PHASE_BATCH_IN", W1.Host, W0.Host, "BATCH_IOPS_IN", false) if err := sg.triggerAndWait(ctx, JobRequest{ SourceWorker: W1, DestWorker: W0, FileURIs: localSyncSeries("small_", 1, 20), Description: "Thesis Phase 5: Bulk Transfer In", WorkloadType: "BATCH_IOPS_IN", }); err != nil { log.Printf("Batch In fehlgeschlagen: %v", err) } sg.cleanupRemote() sg.logGroundTruth("END_PHASE_BATCH_IN", W1.Host, W0.Host, "BATCH_IOPS_IN", false) if fixedPhase >= 0 { phase = 5 } else { phase = 5 } case 5: if sg.config.Mode == ModeTesting && sg.shouldGenerateBurst() { sg.runBurstWorkload(ctx) } else { log.Println("PHASE 6: IDLE (Baseline)") sg.logGroundTruth("IDLE", "ALL", "ALL", "IDLE", false) select { case <-ctx.Done(): return ctx.Err() case <-time.After(60 * time.Second): } } phase = max(fixedPhase, 0) sg.cleanupRemote() log.Println("Zyklus abgeschlossen, starte von vorne...") } } } func (sg *ScenarioGenerator) RunExperiment(ctx context.Context) error { exp := sg.config.Experiment scenarios := exp.Scenarios if len(scenarios) == 0 { for _, s := range sg.sm.GetScenarios() { scenarios = append(scenarios, s.ID) } } totalDuration := exp.BaselineDuration + time.Duration(len(scenarios))*(exp.ChaosDuration+exp.CooldownDuration) log.Printf("Experiment gestartet. Geschätzte Gesamtdauer: %.1f Stunden", totalDuration.Hours()) log.Printf(" Baseline: %.0f Min | Chaos/Szenario: %.0f Min | Cooldown: %.0f Min | Szenarien: %d", exp.BaselineDuration.Minutes(), exp.ChaosDuration.Minutes(), exp.CooldownDuration.Minutes(), len(scenarios)) log.Printf(" PHASE 1: BASELINE (%.0f Min)", exp.BaselineDuration.Minutes()) sg.logGroundTruth("EXPERIMENT_BASELINE_START", "ALL", "ALL", "EXPERIMENT", false) baselineCtx, baselineCancel := context.WithTimeout(ctx, exp.BaselineDuration) if err := sg.Run(baselineCtx); err != nil && err != context.DeadlineExceeded && err != context.Canceled { baselineCancel() return fmt.Errorf("baseline fehlgeschlagen: %w", err) } baselineCancel() sg.logGroundTruth("EXPERIMENT_BASELINE_END", "ALL", "ALL", "EXPERIMENT", false) log.Println("Baseline abgeschlossen. 10s Pause vor Chaos-Phase...") select { case <-ctx.Done(): return ctx.Err() case <-time.After(10 * time.Second): } log.Printf(" PHASE 2: CHAOS INJEKTION (%d Szenarien)", len(scenarios)) chaosWorkloadCtx, chaosWorkloadCancel := context.WithCancel(ctx) defer chaosWorkloadCancel() workloadDone := make(chan error, 1) go func() { workloadDone <- sg.Run(chaosWorkloadCtx) }() for i, scenarioID := range scenarios { select { case <-ctx.Done(): sg.sm.StopScenario() chaosWorkloadCancel() <-workloadDone return ctx.Err() default: } log.Printf("[%d/%d] CHAOS START: %s", i+1, len(scenarios), scenarioID) sg.logGroundTruth("EXPERIMENT_CHAOS_START", "Orchestrator", "System", scenarioID, true) if err := sg.sm.StartScenario(scenarioID); err != nil { log.Printf("Fehler beim Starten von %s: %v – überspringe", scenarioID, err) continue } select { case <-ctx.Done(): sg.sm.StopScenario() chaosWorkloadCancel() <-workloadDone return ctx.Err() case <-time.After(exp.ChaosDuration): } sg.sm.StopScenario() sg.logGroundTruth("EXPERIMENT_CHAOS_END", "Orchestrator", "System", scenarioID, true) log.Printf("[%d/%d] CHAOS STOP: %s | Cooldown: %.0f Min", i+1, len(scenarios), scenarioID, exp.CooldownDuration.Minutes()) select { case <-ctx.Done(): chaosWorkloadCancel() <-workloadDone return ctx.Err() case <-time.After(exp.CooldownDuration): } } chaosWorkloadCancel() <-workloadDone log.Println(" EXPERIMENT ABGESCHLOSSEN") sg.logGroundTruth("EXPERIMENT_END", "ALL", "ALL", "EXPERIMENT", false) return nil } func (sg *ScenarioGenerator) startAPIServer(ctx context.Context, cancelFunc context.CancelFunc) { mux := http.NewServeMux() mux.HandleFunc("/scenarios", func(w http.ResponseWriter, r *http.Request) { if r.Method != http.MethodGet { http.Error(w, "Method not allowed", http.StatusMethodNotAllowed) return } scenarios := sg.sm.GetScenarios() w.Header().Set("Content-Type", "application/json") json.NewEncoder(w).Encode(scenarios) }) mux.HandleFunc("/scenarios/start", func(w http.ResponseWriter, r *http.Request) { if r.Method != http.MethodPost { http.Error(w, "Method not allowed", http.StatusMethodNotAllowed) return } id := r.URL.Query().Get("id") if id == "" { http.Error(w, "Missing id parameter", http.StatusBadRequest) return } if err := sg.sm.StartScenario(id); err != nil { http.Error(w, err.Error(), http.StatusInternalServerError) return } fmt.Fprintf(w, "Scenario %s started", id) }) mux.HandleFunc("/scenarios/stop", func(w http.ResponseWriter, r *http.Request) { if r.Method != http.MethodPost { http.Error(w, "Method not allowed", http.StatusMethodNotAllowed) return } sg.sm.StopScenario() fmt.Fprintf(w, "Scenario stopped") }) server := &http.Server{Addr: ":" + APIPort, Handler: mux} go func() { log.Printf("API Server gestartet auf Port %s", APIPort) if err := server.ListenAndServe(); err != nil && err != http.ErrServerClosed { log.Printf("API Server Fehler: %v", err) cancelFunc() } }() go func() { <-ctx.Done() shutdownCtx, cancel := context.WithTimeout(context.Background(), 5*time.Second) defer cancel() server.Shutdown(shutdownCtx) }() } func main() { log.SetFlags(log.LstdFlags | log.Lshortfile) mode := flag.String("mode", "training", "Szenario-Modus: training, testing, anomaly-only") worker := flag.String("worker", "thesis-worker-0", "Überwachter Worker") burst := flag.Bool("burst", true, "Burst-Traffic aktivieren") maintenance := flag.Bool("maintenance", true, "Wartungsfenster aktivieren") anomalyRate := flag.Float64("anomaly-rate", 0.1, "Anomalie-Rate im Testing-Mode") inventoryPath := flag.String("inventory", "inventory.ini", "Pfad zur Ansible inventory.ini") runExperiment := flag.Bool("run-experiment", false, "Vollständiges Experiment ausführen (Baseline + Chaos)") baselineHours := flag.Float64("baseline-hours", 6, "Dauer der Baseline-Phase in Stunden") chaosMins := flag.Float64("chaos-mins", 20, "Dauer jedes aktiven Chaos-Szenarios in Minuten") cooldownMins := flag.Float64("cooldown-mins", 20, "Cooldown-Dauer zwischen Szenarien in Minuten") workload := flag.String("workload", "all", "Workload-Filter: all | high-bw | high-iops | interference | batch-out | batch-in | idle") flag.Parse() config := Config{ Mode: ScenarioMode(*mode), MonitoredWorker: *worker, EnableBurst: *burst, EnableMaintenance: *maintenance, AnomalyRate: *anomalyRate, RunExperiment: *runExperiment, WorkloadFilter: WorkloadFilter(*workload), Experiment: ExperimentConfig{ BaselineDuration: time.Duration(*baselineHours * float64(time.Hour)), ChaosDuration: time.Duration(*chaosMins * float64(time.Minute)), CooldownDuration: time.Duration(*cooldownMins * float64(time.Minute)), }, } if config.Mode != ModeTraining && config.Mode != ModeTesting && config.Mode != ModeAnomalyOnly { log.Fatalf("Ungültiger Modus: %s", config.Mode) } validWorkloads := map[WorkloadFilter]bool{ WorkloadAll: true, WorkloadHighBW: true, WorkloadHighIOPS: true, WorkloadInterference: true, WorkloadBatchOut: true, WorkloadBatchIn: true, WorkloadIdle: true, } if !validWorkloads[config.WorkloadFilter] { log.Fatalf("Ungültiger Workload-Filter: %s (erlaubt: all, high-bw, high-iops, interference, batch-out, batch-in, idle)", config.WorkloadFilter) } log.Println("Thesis Scenario Generator gestartet") log.Printf(" Modus: %s | Experiment: %v | Workload: %s", config.Mode, config.RunExperiment, config.WorkloadFilter) inv, err := ParseInventory(*inventoryPath) if err != nil { log.Fatalf("Fehler beim Parsen des Inventory: %v", err) } generator, err := NewScenarioGenerator(config, inv) if err != nil { log.Fatalf("Fehler beim Initialisieren des Generators: %v", err) } sm, err := NewScenarioManager(inv, generator.logGroundTruth) if err != nil { log.Fatalf("Fehler beim Initialisieren des Scenario Managers: %v", err) } generator.sm = sm defer func() { if err := generator.Close(); err != nil { log.Printf("Fehler beim Schließen: %v", err) } sm.StopScenario() }() ctx, cancel := context.WithCancel(context.Background()) defer cancel() generator.startAPIServer(ctx, cancel) sigChan := make(chan os.Signal, 1) signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM) errChan := make(chan error, 1) go func() { if config.RunExperiment { errChan <- generator.RunExperiment(ctx) } else { errChan <- generator.Run(ctx) } }() select { case sig := <-sigChan: log.Printf("Signal %v empfangen, fahre graceful herunter...", sig) cancel() done := make(chan struct{}) go func() { generator.wg.Wait() close(done) }() select { case <-done: log.Println("Alle Worker beendet") case <-time.After(30 * time.Second): log.Println("Shutdown Timeout erreicht") } case err := <-errChan: if err != nil && err != context.Canceled && err != context.DeadlineExceeded { log.Fatalf("Generator-Fehler: %v", err) } } log.Println("Scenario Generator beendet") }