bachelor-thesis/scenario_generator/main.go

1146 lines
31 KiB
Go
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

package main
import (
"bytes"
"context"
"encoding/csv"
"encoding/json"
"flag"
"fmt"
"io"
"log"
"math/rand"
"net/http"
"net/url"
"os"
"os/signal"
"strings"
"sync"
"syscall"
"time"
)
const (
Port = "60000"
APIPort = "60001"
AuthHeader = "Basic YWRtaW46dmVyeXNlY3JldA=="
LogFile = "./thesis_scenario_ground_truth.csv"
IncomingDir = "/local_testdata/test-destination"
WorkerPoolSize = 10
MaxRetries = 3
RetryBackoff = 2 * time.Second
MFTStatusCompleted = 870
MFTStatusFailed = 900
// How long to wait for a single job before giving up
JobCompletionTimeout = 5 * time.Minute
)
type ScenarioMode string
const (
ModeTraining ScenarioMode = "training"
ModeTesting ScenarioMode = "testing"
ModeAnomalyOnly ScenarioMode = "anomaly-only"
)
// ExperimentConfig holds the full experiment timing configuration.
// Used by the built-in experiment runner (--run-experiment flag).
type ExperimentConfig struct {
BaselineDuration time.Duration
ChaosDuration time.Duration
CooldownDuration time.Duration
Scenarios []string // if empty, all scenarios are used
}
type Worker struct {
Name string
Host string
ID string
}
type TransferJob struct {
Description string `json:"description"`
Schedule string `json:"schedule"`
DestinationSystem string `json:"destination_system"`
DestinationShare string `json:"destination_share"`
SourceFileURIs []string `json:"source_file_uris"`
WaitForPublish bool `json:"wait_for_publish"`
FlatFileModeDisabled bool `json:"flat_file_mode_disabled"`
}
type MFTJobResponse struct {
ID int `json:"id"`
JobID string `json:"job_id"`
StatusCode int `json:"status_code"`
StatusMessage string `json:"status_message"`
StatusDetail string `json:"status_detail"`
}
type JobRequest struct {
SourceWorker Worker
DestWorker Worker
Schedule string
FileURI string
FileURIs []string
Description string
WorkloadType string
IsAnomaly bool
}
type WorkloadFilter string
const (
WorkloadAll WorkloadFilter = "all"
WorkloadHighBW WorkloadFilter = "high-bw"
WorkloadHighIOPS WorkloadFilter = "high-iops"
WorkloadInterference WorkloadFilter = "interference"
WorkloadBatchOut WorkloadFilter = "batch-out"
WorkloadBatchIn WorkloadFilter = "batch-in"
WorkloadIdle WorkloadFilter = "idle"
)
type Config struct {
Mode ScenarioMode
MonitoredWorker string
EnableBurst bool
EnableMaintenance bool
AnomalyRate float64
RunExperiment bool
Experiment ExperimentConfig
WorkloadFilter WorkloadFilter
}
type ScenarioGenerator struct {
workers []Worker
config Config
logWriter *csv.Writer
logFile *os.File
logMutex sync.Mutex
httpClient *http.Client
rand *rand.Rand
randMutex sync.Mutex
jobQueue chan JobRequest
wg sync.WaitGroup
sm *ScenarioManager
}
func NewScenarioGenerator(config Config, inv *Inventory) (*ScenarioGenerator, error) {
var workers []Worker
for i := range 3 {
id := fmt.Sprintf("thesis-worker-%d", i)
host := id
if inv != nil {
if h, ok := inv.Workers[id]; ok {
if h.IP != "" {
host = h.IP
}
}
}
workers = append(workers, Worker{
Name: fmt.Sprintf("Worker Node %d", i),
Host: host,
ID: id,
})
}
logFile, err := os.OpenFile(LogFile, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0o644)
if err != nil {
return nil, fmt.Errorf("kann Log-Datei nicht öffnen: %w", err)
}
stat, _ := logFile.Stat()
writer := csv.NewWriter(logFile)
if stat.Size() == 0 {
if err := writer.Write([]string{
"timestamp", "action", "source", "target", "workload_type", "is_anomaly", "scenario_mode",
}); err != nil {
logFile.Close()
return nil, fmt.Errorf("kann Header nicht schreiben: %w", err)
}
writer.Flush()
}
return &ScenarioGenerator{
workers: workers,
config: config,
logWriter: writer,
logFile: logFile,
httpClient: &http.Client{Timeout: 30 * time.Second},
rand: rand.New(rand.NewSource(time.Now().UnixNano())),
jobQueue: make(chan JobRequest, 100),
sm: nil,
}, nil
}
func (sg *ScenarioGenerator) Close() error {
sg.logMutex.Lock()
defer sg.logMutex.Unlock()
sg.logWriter.Flush()
return sg.logFile.Close()
}
func (sg *ScenarioGenerator) logGroundTruth(action, source, target, workloadType string, isAnomaly bool) {
sg.logMutex.Lock()
defer sg.logMutex.Unlock()
timestamp := time.Now().Format("2006-01-02 15:04:05")
anomalyStr := "0"
if isAnomaly {
anomalyStr = "1"
}
if err := sg.logWriter.Write([]string{
timestamp, action, source, target, workloadType, anomalyStr, string(sg.config.Mode),
}); err != nil {
log.Printf("Fehler beim Schreiben in Log: %v", err)
return
}
sg.logWriter.Flush()
anomalyMarker := ""
if isAnomaly {
anomalyMarker = " [ANOMALY]"
}
log.Printf("[GT] %s | %s: %s -> %s (%s)%s", timestamp, action, source, target, workloadType, anomalyMarker)
}
func (sg *ScenarioGenerator) cleanupRemote() {
monitoredHost := sg.config.MonitoredWorker
log.Printf("Bereinige Incoming-Ordner auf %s (via SSH)...", monitoredHost)
cmd := fmt.Sprintf("sudo find %s -type f -mmin +10 -delete", IncomingDir)
client, ok := sg.sm.SSHManager[monitoredHost]
if !ok {
log.Printf("Fehler: Kein SSH-Client für %s gefunden", monitoredHost)
return
}
out, err := client.RunCommand(cmd)
if err != nil {
log.Printf("Remote Cleanup fehlgeschlagen: %v (Output: %s)", err, out)
return
}
log.Println("Remote Cleanup erfolgreich abgeschlossen.")
}
func (sg *ScenarioGenerator) waitForAPI(ctx context.Context, worker Worker) error {
apiURL := fmt.Sprintf("http://%s:%s/transfer-job-manager/v1/jobs", worker.Host, Port)
log.Printf("Prüfe Verfügbarkeit von %s (%s)...", worker.Name, worker.Host)
ticker := time.NewTicker(10 * time.Second)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return ctx.Err()
case <-ticker.C:
req, err := http.NewRequestWithContext(ctx, "GET", apiURL, nil)
if err != nil {
continue
}
req.Header.Set("Authorization", AuthHeader)
resp, err := sg.httpClient.Do(req)
if err == nil && resp.StatusCode == 200 {
resp.Body.Close()
log.Printf("%s ist ONLINE", worker.Name)
return nil
}
if resp != nil {
resp.Body.Close()
}
log.Printf("Warte auf %s... Retry in 10s", worker.Name)
}
}
}
func (sg *ScenarioGenerator) triggerJobWithRetry(ctx context.Context, req JobRequest) (string, error) {
const maxServiceRecoveryAttempts = 2
for recoveryAttempt := range maxServiceRecoveryAttempts + 1 {
jobID, err := sg.tryTriggerJob(ctx, req)
if err == nil {
return jobID, nil
}
if recoveryAttempt >= maxServiceRecoveryAttempts {
return "", fmt.Errorf("job failed after service recovery: %w", err)
}
log.Printf("[TRIGGER] Alle %d Versuche fehlgeschlagen, starte Health-Check auf %s...",
MaxRetries, req.SourceWorker.Host)
sg.checkAndRestartWorkerServices(req.SourceWorker)
}
return "", fmt.Errorf("job trigger fehlgeschlagen nach Service-Recovery")
}
// tryTriggerJob fuehrt MaxRetries POST-Versuche durch ohne Service-Recovery.
func (sg *ScenarioGenerator) tryTriggerJob(ctx context.Context, req JobRequest) (string, error) {
var lastErr error
for attempt := range MaxRetries {
if attempt > 0 {
select {
case <-ctx.Done():
return "", ctx.Err()
case <-time.After(RetryBackoff * time.Duration(attempt)):
}
}
sourceFiles := req.FileURIs
if len(sourceFiles) == 0 && req.FileURI != "" {
sourceFiles = []string{req.FileURI}
}
job := TransferJob{
Description: req.Description,
Schedule: "NOW",
DestinationSystem: req.DestWorker.Name,
DestinationShare: "local_sync_destination",
SourceFileURIs: sourceFiles,
WaitForPublish: false,
FlatFileModeDisabled: false,
}
jsonData, err := json.Marshal(job)
if err != nil {
return "", fmt.Errorf("marshal error: %w", err)
}
apiURL := fmt.Sprintf("http://%s:%s/transfer-job-manager/v1/jobs", req.SourceWorker.Host, Port)
httpReq, err := http.NewRequestWithContext(ctx, "POST", apiURL, bytes.NewBuffer(jsonData))
if err != nil {
return "", fmt.Errorf("request creation error: %w", err)
}
httpReq.Header.Set("Authorization", AuthHeader)
httpReq.Header.Set("Content-Type", "application/json")
resp, err := sg.httpClient.Do(httpReq)
if err != nil {
lastErr = err
log.Printf("Fehler bei Job-Trigger (Versuch %d/%d): %v", attempt+1, MaxRetries, err)
continue
}
if resp.StatusCode >= 200 && resp.StatusCode < 300 {
var result MFTJobResponse
if err := json.NewDecoder(resp.Body).Decode(&result); err != nil {
resp.Body.Close()
return "", fmt.Errorf("response decode error: %w", err)
}
resp.Body.Close()
log.Printf("Job submitted: id=%d job_id=%s", result.ID, result.JobID)
return result.JobID, nil
}
io.Copy(io.Discard, resp.Body)
resp.Body.Close()
lastErr = fmt.Errorf("HTTP %d", resp.StatusCode)
log.Printf("Job fehlgeschlagen mit Status %d (Versuch %d/%d)", resp.StatusCode, attempt+1, MaxRetries)
}
return "", fmt.Errorf("job failed after %d attempts: %w", MaxRetries, lastErr)
}
func (sg *ScenarioGenerator) fetchJobStatus(ctx context.Context, sourceWorker Worker, jobIDStr string) (*MFTJobResponse, error) {
encodedJobID := url.PathEscape(jobIDStr)
apiURL := fmt.Sprintf("http://%s:%s/transfer-job-manager/v1/jobs/%s",
sourceWorker.Host, Port, encodedJobID)
httpReq, err := http.NewRequestWithContext(ctx, "GET", apiURL, nil)
if err != nil {
return nil, err
}
httpReq.Header.Set("Authorization", AuthHeader)
resp, err := sg.httpClient.Do(httpReq)
if err != nil {
return nil, err
}
defer resp.Body.Close()
if resp.StatusCode != 200 {
return nil, fmt.Errorf("HTTP %d für job %s", resp.StatusCode, jobIDStr)
}
var result MFTJobResponse
if err := json.NewDecoder(resp.Body).Decode(&result); err != nil {
return nil, fmt.Errorf("decode error: %w", err)
}
return &result, nil
}
const MFTServiceRestartThreshold = 3
const (
serviceTransferJobManager = "transfer-job-manager"
serviceTixstream = "tixstream"
)
func (sg *ScenarioGenerator) checkAndRestartWorkerServices(worker Worker) bool {
log.Printf("[HEALTH] Pruefe Services auf %s via SSH...", worker.Host)
hostCfg, ok := sg.sm.Inventory.Workers[worker.ID]
if !ok {
log.Printf("[HEALTH] Worker %s nicht in Inventory gefunden", worker.ID)
return false
}
client, err := NewSSHClient(hostCfg)
if err != nil {
log.Printf("[HEALTH] SSH-Verbindung zu %s fehlgeschlagen: %v", worker.Host, err)
return false
}
defer client.Close()
restarted := false
for _, svc := range []string{serviceTransferJobManager, serviceTixstream} {
out, err := client.RunCommand(fmt.Sprintf("systemctl is-active %s", svc))
status := strings.TrimSpace(out)
if err != nil || status != "active" {
log.Printf("[HEALTH] Service %s auf %s ist '%s' - starte neu...", svc, worker.Host, status)
sg.logGroundTruth("MFT_SERVICE_RESTART", worker.Host, svc, "N/A", false)
restartOut, restartErr := client.RunCommand(fmt.Sprintf("sudo systemctl restart %s", svc))
if restartErr != nil {
log.Printf("[HEALTH] Restart von %s auf %s fehlgeschlagen: %v (output: %s)",
svc, worker.Host, restartErr, restartOut)
continue
}
log.Printf("[HEALTH] %s auf %s erfolgreich neugestartet", svc, worker.Host)
restarted = true
} else {
log.Printf("[HEALTH] Service %s auf %s: OK (active)", svc, worker.Host)
}
}
if restarted {
log.Printf("[HEALTH] Warte 15s auf Service-Startup auf %s...", worker.Host)
time.Sleep(15 * time.Second)
}
return restarted
}
func (sg *ScenarioGenerator) waitForJobCompletion(ctx context.Context, sourceWorker Worker, jobIDStr string) error {
ticker := time.NewTicker(5 * time.Second)
defer ticker.Stop()
timeout := time.After(JobCompletionTimeout)
log.Printf("Warte auf Job-Abschluss: %s", jobIDStr)
consecutiveErrors := 0
for {
select {
case <-ctx.Done():
return ctx.Err()
case <-timeout:
return fmt.Errorf("job %s timed out nach %s", jobIDStr, JobCompletionTimeout)
case <-ticker.C:
result, err := sg.fetchJobStatus(ctx, sourceWorker, jobIDStr)
if err != nil {
consecutiveErrors++
log.Printf("Polling-Fehler fuer Job %s (%d/%d): %v",
jobIDStr, consecutiveErrors, MFTServiceRestartThreshold, err)
if consecutiveErrors >= MFTServiceRestartThreshold {
sg.checkAndRestartWorkerServices(sourceWorker)
consecutiveErrors = 0
}
continue
}
consecutiveErrors = 0
log.Printf("Job %s: %s %s (%d)",
jobIDStr, result.StatusMessage, result.StatusDetail, result.StatusCode)
switch {
case result.StatusCode == MFTStatusCompleted:
log.Printf("Job abgeschlossen: %s", jobIDStr)
return nil
case result.StatusCode == 880:
log.Printf("Job %s: temporaerer Fehler (880), warte auf Auto-Retry...", jobIDStr)
case result.StatusCode >= MFTStatusFailed:
return fmt.Errorf("job %s fehlgeschlagen (terminal): %s %s",
jobIDStr, result.StatusMessage, result.StatusDetail)
}
}
}
}
func (sg *ScenarioGenerator) triggerAndWait(ctx context.Context, req JobRequest) error {
jobIDStr, err := sg.triggerJobWithRetry(ctx, req)
if err != nil {
return err
}
return sg.waitForJobCompletion(ctx, req.SourceWorker, jobIDStr)
}
func (sg *ScenarioGenerator) triggerJobOverSSHWithRetry(ctx context.Context, req JobRequest) error {
var lastErr error
for attempt := range MaxRetries {
if attempt > 0 {
select {
case <-ctx.Done():
return ctx.Err()
case <-time.After(RetryBackoff * time.Duration(1<<attempt)):
}
}
client, ok := sg.sm.SSHManager[req.SourceWorker.ID]
if !ok {
lastErr = fmt.Errorf("kein SSH-Client für %s gefunden", req.SourceWorker.ID)
continue
}
sourceFiles := req.FileURIs
if len(sourceFiles) == 0 && req.FileURI != "" {
sourceFiles = []string{req.FileURI}
}
for _, file := range sourceFiles {
cmd := fmt.Sprintf(
"/opt/tixel/tixstream/bin/tixstream-client start --address %s:59999 --peer %s:60002 --source local_sync_source --destination local_sync_destination --dentry %s",
req.SourceWorker.ID, req.DestWorker.ID, file,
)
out, err := client.RunCommand(cmd)
if err != nil {
lastErr = fmt.Errorf("SSH command failed: %w (output: %s)", err, out)
break
}
log.Printf("SSH Transfer gestartet: %s", file)
}
return nil
}
return fmt.Errorf("SSH job failed after %d attempts: %w", MaxRetries, lastErr)
}
func (sg *ScenarioGenerator) startWorkerPool(ctx context.Context) {
for i := range WorkerPoolSize {
sg.wg.Add(1)
go func(workerID int) {
defer sg.wg.Done()
for {
select {
case <-ctx.Done():
return
case req, ok := <-sg.jobQueue:
if !ok {
return
}
sg.logGroundTruth("START_TRANSFER", req.SourceWorker.Host, req.DestWorker.Host, req.WorkloadType, req.IsAnomaly)
if err := sg.triggerAndWait(ctx, req); err != nil {
log.Printf("Worker %d: Job fehlgeschlagen: %v", workerID, err)
sg.logGroundTruth("TRANSFER_FAILED", req.SourceWorker.Host, req.DestWorker.Host, req.WorkloadType, req.IsAnomaly)
}
}
}
}(i)
}
}
func (sg *ScenarioGenerator) intn(n int) int {
sg.randMutex.Lock()
defer sg.randMutex.Unlock()
return sg.rand.Intn(n)
}
func (sg *ScenarioGenerator) float64n() float64 {
sg.randMutex.Lock()
defer sg.randMutex.Unlock()
return sg.rand.Float64()
}
func (sg *ScenarioGenerator) getMonitoredWorker() Worker {
for _, w := range sg.workers {
if w.ID == sg.config.MonitoredWorker {
return w
}
}
return sg.workers[0]
}
func (sg *ScenarioGenerator) shouldGenerateBurst() bool {
if !sg.config.EnableBurst || sg.config.Mode == ModeTraining {
return false
}
if sg.config.Mode == ModeTesting {
return sg.float64n() < sg.config.AnomalyRate*0.5
}
return sg.float64n() < 0.2
}
func (sg *ScenarioGenerator) runBurstWorkload(ctx context.Context) {
log.Println("BURST-TRAFFIC erkannt! [ANOMALY]")
sg.logGroundTruth("BURST_START", "ALL", "ALL", "BURST", true)
W0 := sg.getMonitoredWorker()
numBurstJobs := 15 + sg.intn(10)
for i := range numBurstJobs {
select {
case <-ctx.Done():
return
default:
}
destIdx := 1 + sg.intn(len(sg.workers)-1)
dest := sg.workers[destIdx]
fileNum := sg.intn(10) + 1
req := JobRequest{
SourceWorker: W0,
DestWorker: dest,
FileURI: fmt.Sprintf("local_sync_source/small_%d.dat", fileNum),
Description: fmt.Sprintf("Burst File %d", i+1),
WorkloadType: "BURST",
IsAnomaly: true,
}
select {
case sg.jobQueue <- req:
case <-ctx.Done():
return
}
select {
case <-ctx.Done():
return
case <-time.After(time.Duration(200+sg.intn(300)) * time.Millisecond):
}
}
sg.logGroundTruth("BURST_END", "ALL", "ALL", "BURST", true)
}
func (sg *ScenarioGenerator) runMaintenanceWorkload(ctx context.Context) {
if !sg.config.EnableMaintenance {
return
}
isAnomaly := sg.config.Mode != ModeTraining
log.Println("WARTUNGSFENSTER - Backup")
sg.logGroundTruth("MAINTENANCE_START", "ALL", "ALL", "MAINTENANCE", isAnomaly)
W0 := sg.getMonitoredWorker()
for i := range 3 {
select {
case <-ctx.Done():
return
default:
}
dest := sg.workers[1+(i%(len(sg.workers)-1))]
req := JobRequest{
SourceWorker: W0,
DestWorker: dest,
FileURI: "local_sync_source/5g.mxf",
Description: fmt.Sprintf("Maintenance Backup %d", i+1),
WorkloadType: "MAINTENANCE_BACKUP",
IsAnomaly: isAnomaly,
}
if err := sg.triggerAndWait(ctx, req); err != nil {
log.Printf("Maintenance Job fehlgeschlagen: %v", err)
}
}
sg.logGroundTruth("MAINTENANCE_END", "ALL", "ALL", "MAINTENANCE", isAnomaly)
}
func localSyncFiles(names ...string) []string {
result := make([]string, len(names))
for i, name := range names {
result[i] = "local_sync_source/" + name
}
return result
}
func localSyncSeries(prefix string, from, to int) []string {
result := make([]string, 0, to-from+1)
for i := from; i <= to; i++ {
result = append(result, fmt.Sprintf("local_sync_source/%s%d.dat", prefix, i))
}
return result
}
func (sg *ScenarioGenerator) Run(ctx context.Context) error {
for _, worker := range sg.workers {
if err := sg.waitForAPI(ctx, worker); err != nil {
return fmt.Errorf("worker %s nicht erreichbar: %w", worker.Name, err)
}
}
log.Printf("Starte Szenario im %s-Modus", sg.config.Mode)
log.Printf(" Überwachter Worker: %s", sg.config.MonitoredWorker)
sg.logGroundTruth("SCENARIO_START", "ALL", "ALL", "START", false)
sg.startWorkerPool(ctx)
W0 := sg.getMonitoredWorker()
var W1, W2 Worker
for _, w := range sg.workers {
switch w.ID {
case "thesis-worker-1":
W1 = w
case "thesis-worker-2":
W2 = w
}
}
initialPhase := 0
fixedPhase := -1
switch sg.config.WorkloadFilter {
case WorkloadHighBW:
initialPhase, fixedPhase = 0, 0
case WorkloadHighIOPS:
initialPhase, fixedPhase = 1, 1
case WorkloadInterference:
initialPhase, fixedPhase = 2, 2
case WorkloadBatchOut:
initialPhase, fixedPhase = 3, 3
case WorkloadBatchIn:
initialPhase, fixedPhase = 4, 4
case WorkloadIdle:
initialPhase, fixedPhase = 5, 5
}
if fixedPhase >= 0 {
log.Printf("WorkloadFilter aktiv: nur Phase '%s' (phase=%d) wird wiederholt", sg.config.WorkloadFilter, fixedPhase)
}
phase := initialPhase
for {
select {
case <-ctx.Done():
log.Println("Shutdown-Signal empfangen")
return ctx.Err()
default:
}
if sg.config.EnableMaintenance && sg.config.Mode != ModeTraining && phase == 0 && sg.intn(100) < 5 {
sg.runMaintenanceWorkload(ctx)
}
switch phase {
case 0:
log.Println("PHASE 1: High Bandwidth (W0 -> W1)")
sg.logGroundTruth("START_PHASE_BW", W0.Host, W1.Host, "HIGH_BW", false)
if err := sg.triggerAndWait(ctx, JobRequest{
SourceWorker: W0,
DestWorker: W1,
FileURI: "local_sync_source/5g.mxf",
Description: "Thesis Phase 1: High BW",
WorkloadType: "HIGH_BW",
}); err != nil {
log.Printf("Phase 1 fehlgeschlagen: %v", err)
}
sg.logGroundTruth("END_PHASE_BW", W0.Host, W1.Host, "HIGH_BW", false)
if fixedPhase >= 0 {
phase = 5
} else {
phase = 1
}
case 1:
log.Println("PHASE 2: High IOPS (W0 -> W2)")
sg.logGroundTruth("START_PHASE_IOPS", W0.Host, W2.Host, "HIGH_IOPS", false)
var iopsWg sync.WaitGroup
for i := 1; i <= 20; i++ {
select {
case <-ctx.Done():
return ctx.Err()
default:
}
iopsWg.Add(1)
go func(fileIdx int) {
defer iopsWg.Done()
if err := sg.triggerAndWait(ctx, JobRequest{
SourceWorker: W0,
DestWorker: W2,
FileURI: fmt.Sprintf("local_sync_source/small_%d.dat", (fileIdx%10)+1),
Description: fmt.Sprintf("Thesis Phase 2: File %d", fileIdx),
WorkloadType: "HIGH_IOPS",
}); err != nil {
log.Printf("IOPS Job %d fehlgeschlagen: %v", fileIdx, err)
}
}(i)
time.Sleep(200 * time.Millisecond)
}
iopsWg.Wait()
sg.logGroundTruth("END_PHASE_IOPS", W0.Host, W2.Host, "HIGH_IOPS", false)
if fixedPhase >= 0 {
phase = 5
} else {
phase = 2
}
case 2:
log.Println("PHASE 3: Interferenz (W1 -> W0)")
sg.logGroundTruth("START_PHASE_INTERFERENCE", W1.Host, W0.Host, "INTERFERENCE", false)
if err := sg.triggerAndWait(ctx, JobRequest{
SourceWorker: W1,
DestWorker: W0,
FileURI: "local_sync_source/1g.mxf",
Description: "Thesis Phase 3: Interference",
WorkloadType: "INTERFERENCE",
}); err != nil {
log.Printf("Phase 3 fehlgeschlagen: %v", err)
}
sg.cleanupRemote()
sg.logGroundTruth("END_PHASE_INTERFERENCE", W1.Host, W0.Host, "INTERFERENCE", false)
if fixedPhase >= 0 {
phase = 5
} else {
phase = 3
}
case 3:
log.Println("PHASE 4: Batch Transfer OUT (W0 -> W2)")
sg.logGroundTruth("START_PHASE_BATCH_OUT", W0.Host, W2.Host, "BATCH_IOPS_OUT", false)
if err := sg.triggerAndWait(ctx, JobRequest{
SourceWorker: W0,
DestWorker: W2,
FileURIs: localSyncSeries("small_", 1, 20),
Description: "Thesis Phase 4: Bulk Transfer Out",
WorkloadType: "BATCH_IOPS_OUT",
}); err != nil {
log.Printf("Batch Out fehlgeschlagen: %v", err)
}
sg.logGroundTruth("END_PHASE_BATCH_OUT", W0.Host, W2.Host, "BATCH_IOPS_OUT", false)
if fixedPhase >= 0 {
phase = 5
} else {
phase = 4
}
case 4:
log.Println("PHASE 5: Batch Transfer IN (W1 -> W0)")
sg.logGroundTruth("START_PHASE_BATCH_IN", W1.Host, W0.Host, "BATCH_IOPS_IN", false)
if err := sg.triggerAndWait(ctx, JobRequest{
SourceWorker: W1,
DestWorker: W0,
FileURIs: localSyncSeries("small_", 1, 20),
Description: "Thesis Phase 5: Bulk Transfer In",
WorkloadType: "BATCH_IOPS_IN",
}); err != nil {
log.Printf("Batch In fehlgeschlagen: %v", err)
}
sg.cleanupRemote()
sg.logGroundTruth("END_PHASE_BATCH_IN", W1.Host, W0.Host, "BATCH_IOPS_IN", false)
if fixedPhase >= 0 {
phase = 5
} else {
phase = 5
}
case 5:
if sg.config.Mode == ModeTesting && sg.shouldGenerateBurst() {
sg.runBurstWorkload(ctx)
} else {
log.Println("PHASE 6: IDLE (Baseline)")
sg.logGroundTruth("IDLE", "ALL", "ALL", "IDLE", false)
select {
case <-ctx.Done():
return ctx.Err()
case <-time.After(60 * time.Second):
}
}
phase = max(fixedPhase, 0)
sg.cleanupRemote()
log.Println("Zyklus abgeschlossen, starte von vorne...")
}
}
}
func (sg *ScenarioGenerator) RunExperiment(ctx context.Context) error {
exp := sg.config.Experiment
scenarios := exp.Scenarios
if len(scenarios) == 0 {
for _, s := range sg.sm.GetScenarios() {
scenarios = append(scenarios, s.ID)
}
}
totalDuration := exp.BaselineDuration + time.Duration(len(scenarios))*(exp.ChaosDuration+exp.CooldownDuration)
log.Printf("Experiment gestartet. Geschätzte Gesamtdauer: %.1f Stunden", totalDuration.Hours())
log.Printf(" Baseline: %.0f Min | Chaos/Szenario: %.0f Min | Cooldown: %.0f Min | Szenarien: %d",
exp.BaselineDuration.Minutes(), exp.ChaosDuration.Minutes(), exp.CooldownDuration.Minutes(), len(scenarios))
log.Printf(" PHASE 1: BASELINE (%.0f Min)", exp.BaselineDuration.Minutes())
sg.logGroundTruth("EXPERIMENT_BASELINE_START", "ALL", "ALL", "EXPERIMENT", false)
baselineCtx, baselineCancel := context.WithTimeout(ctx, exp.BaselineDuration)
if err := sg.Run(baselineCtx); err != nil && err != context.DeadlineExceeded && err != context.Canceled {
baselineCancel()
return fmt.Errorf("baseline fehlgeschlagen: %w", err)
}
baselineCancel()
sg.logGroundTruth("EXPERIMENT_BASELINE_END", "ALL", "ALL", "EXPERIMENT", false)
log.Println("Baseline abgeschlossen. 10s Pause vor Chaos-Phase...")
select {
case <-ctx.Done():
return ctx.Err()
case <-time.After(10 * time.Second):
}
log.Printf(" PHASE 2: CHAOS INJEKTION (%d Szenarien)", len(scenarios))
chaosWorkloadCtx, chaosWorkloadCancel := context.WithCancel(ctx)
defer chaosWorkloadCancel()
workloadDone := make(chan error, 1)
go func() {
workloadDone <- sg.Run(chaosWorkloadCtx)
}()
for i, scenarioID := range scenarios {
select {
case <-ctx.Done():
sg.sm.StopScenario()
chaosWorkloadCancel()
<-workloadDone
return ctx.Err()
default:
}
log.Printf("[%d/%d] CHAOS START: %s", i+1, len(scenarios), scenarioID)
sg.logGroundTruth("EXPERIMENT_CHAOS_START", "Orchestrator", "System", scenarioID, true)
if err := sg.sm.StartScenario(scenarioID); err != nil {
log.Printf("Fehler beim Starten von %s: %v überspringe", scenarioID, err)
continue
}
select {
case <-ctx.Done():
sg.sm.StopScenario()
chaosWorkloadCancel()
<-workloadDone
return ctx.Err()
case <-time.After(exp.ChaosDuration):
}
sg.sm.StopScenario()
sg.logGroundTruth("EXPERIMENT_CHAOS_END", "Orchestrator", "System", scenarioID, true)
log.Printf("[%d/%d] CHAOS STOP: %s | Cooldown: %.0f Min", i+1, len(scenarios), scenarioID, exp.CooldownDuration.Minutes())
select {
case <-ctx.Done():
chaosWorkloadCancel()
<-workloadDone
return ctx.Err()
case <-time.After(exp.CooldownDuration):
}
}
chaosWorkloadCancel()
<-workloadDone
log.Println(" EXPERIMENT ABGESCHLOSSEN")
sg.logGroundTruth("EXPERIMENT_END", "ALL", "ALL", "EXPERIMENT", false)
return nil
}
func (sg *ScenarioGenerator) startAPIServer(ctx context.Context, cancelFunc context.CancelFunc) {
mux := http.NewServeMux()
mux.HandleFunc("/scenarios", func(w http.ResponseWriter, r *http.Request) {
if r.Method != http.MethodGet {
http.Error(w, "Method not allowed", http.StatusMethodNotAllowed)
return
}
scenarios := sg.sm.GetScenarios()
w.Header().Set("Content-Type", "application/json")
json.NewEncoder(w).Encode(scenarios)
})
mux.HandleFunc("/scenarios/start", func(w http.ResponseWriter, r *http.Request) {
if r.Method != http.MethodPost {
http.Error(w, "Method not allowed", http.StatusMethodNotAllowed)
return
}
id := r.URL.Query().Get("id")
if id == "" {
http.Error(w, "Missing id parameter", http.StatusBadRequest)
return
}
if err := sg.sm.StartScenario(id); err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
fmt.Fprintf(w, "Scenario %s started", id)
})
mux.HandleFunc("/scenarios/stop", func(w http.ResponseWriter, r *http.Request) {
if r.Method != http.MethodPost {
http.Error(w, "Method not allowed", http.StatusMethodNotAllowed)
return
}
sg.sm.StopScenario()
fmt.Fprintf(w, "Scenario stopped")
})
server := &http.Server{Addr: ":" + APIPort, Handler: mux}
go func() {
log.Printf("API Server gestartet auf Port %s", APIPort)
if err := server.ListenAndServe(); err != nil && err != http.ErrServerClosed {
log.Printf("API Server Fehler: %v", err)
cancelFunc()
}
}()
go func() {
<-ctx.Done()
shutdownCtx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
server.Shutdown(shutdownCtx)
}()
}
func main() {
log.SetFlags(log.LstdFlags | log.Lshortfile)
mode := flag.String("mode", "training", "Szenario-Modus: training, testing, anomaly-only")
worker := flag.String("worker", "thesis-worker-0", "Überwachter Worker")
burst := flag.Bool("burst", true, "Burst-Traffic aktivieren")
maintenance := flag.Bool("maintenance", true, "Wartungsfenster aktivieren")
anomalyRate := flag.Float64("anomaly-rate", 0.1, "Anomalie-Rate im Testing-Mode")
inventoryPath := flag.String("inventory", "inventory.ini", "Pfad zur Ansible inventory.ini")
runExperiment := flag.Bool("run-experiment", false, "Vollständiges Experiment ausführen (Baseline + Chaos)")
baselineHours := flag.Float64("baseline-hours", 6, "Dauer der Baseline-Phase in Stunden")
chaosMins := flag.Float64("chaos-mins", 20, "Dauer jedes aktiven Chaos-Szenarios in Minuten")
cooldownMins := flag.Float64("cooldown-mins", 20, "Cooldown-Dauer zwischen Szenarien in Minuten")
workload := flag.String("workload", "all", "Workload-Filter: all | high-bw | high-iops | interference | batch-out | batch-in | idle")
flag.Parse()
config := Config{
Mode: ScenarioMode(*mode),
MonitoredWorker: *worker,
EnableBurst: *burst,
EnableMaintenance: *maintenance,
AnomalyRate: *anomalyRate,
RunExperiment: *runExperiment,
WorkloadFilter: WorkloadFilter(*workload),
Experiment: ExperimentConfig{
BaselineDuration: time.Duration(*baselineHours * float64(time.Hour)),
ChaosDuration: time.Duration(*chaosMins * float64(time.Minute)),
CooldownDuration: time.Duration(*cooldownMins * float64(time.Minute)),
},
}
if config.Mode != ModeTraining && config.Mode != ModeTesting && config.Mode != ModeAnomalyOnly {
log.Fatalf("Ungültiger Modus: %s", config.Mode)
}
validWorkloads := map[WorkloadFilter]bool{
WorkloadAll: true, WorkloadHighBW: true, WorkloadHighIOPS: true,
WorkloadInterference: true, WorkloadBatchOut: true, WorkloadBatchIn: true,
WorkloadIdle: true,
}
if !validWorkloads[config.WorkloadFilter] {
log.Fatalf("Ungültiger Workload-Filter: %s (erlaubt: all, high-bw, high-iops, interference, batch-out, batch-in, idle)", config.WorkloadFilter)
}
log.Println("Thesis Scenario Generator gestartet")
log.Printf(" Modus: %s | Experiment: %v | Workload: %s", config.Mode, config.RunExperiment, config.WorkloadFilter)
inv, err := ParseInventory(*inventoryPath)
if err != nil {
log.Fatalf("Fehler beim Parsen des Inventory: %v", err)
}
generator, err := NewScenarioGenerator(config, inv)
if err != nil {
log.Fatalf("Fehler beim Initialisieren des Generators: %v", err)
}
sm, err := NewScenarioManager(inv, generator.logGroundTruth)
if err != nil {
log.Fatalf("Fehler beim Initialisieren des Scenario Managers: %v", err)
}
generator.sm = sm
defer func() {
if err := generator.Close(); err != nil {
log.Printf("Fehler beim Schließen: %v", err)
}
sm.StopScenario()
}()
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
generator.startAPIServer(ctx, cancel)
sigChan := make(chan os.Signal, 1)
signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM)
errChan := make(chan error, 1)
go func() {
if config.RunExperiment {
errChan <- generator.RunExperiment(ctx)
} else {
errChan <- generator.Run(ctx)
}
}()
select {
case sig := <-sigChan:
log.Printf("Signal %v empfangen, fahre graceful herunter...", sig)
cancel()
done := make(chan struct{})
go func() {
generator.wg.Wait()
close(done)
}()
select {
case <-done:
log.Println("Alle Worker beendet")
case <-time.After(30 * time.Second):
log.Println("Shutdown Timeout erreicht")
}
case err := <-errChan:
if err != nil && err != context.Canceled && err != context.DeadlineExceeded {
log.Fatalf("Generator-Fehler: %v", err)
}
}
log.Println("Scenario Generator beendet")
}