1146 lines
31 KiB
Go
1146 lines
31 KiB
Go
package main
|
||
|
||
import (
|
||
"bytes"
|
||
"context"
|
||
"encoding/csv"
|
||
"encoding/json"
|
||
"flag"
|
||
"fmt"
|
||
"io"
|
||
"log"
|
||
"math/rand"
|
||
"net/http"
|
||
"net/url"
|
||
"os"
|
||
"os/signal"
|
||
"strings"
|
||
"sync"
|
||
"syscall"
|
||
"time"
|
||
)
|
||
|
||
const (
|
||
Port = "60000"
|
||
APIPort = "60001"
|
||
AuthHeader = "Basic YWRtaW46dmVyeXNlY3JldA=="
|
||
LogFile = "./thesis_scenario_ground_truth.csv"
|
||
IncomingDir = "/local_testdata/test-destination"
|
||
WorkerPoolSize = 10
|
||
MaxRetries = 3
|
||
RetryBackoff = 2 * time.Second
|
||
|
||
MFTStatusCompleted = 870
|
||
MFTStatusFailed = 900
|
||
|
||
// How long to wait for a single job before giving up
|
||
JobCompletionTimeout = 5 * time.Minute
|
||
)
|
||
|
||
type ScenarioMode string
|
||
|
||
const (
|
||
ModeTraining ScenarioMode = "training"
|
||
ModeTesting ScenarioMode = "testing"
|
||
ModeAnomalyOnly ScenarioMode = "anomaly-only"
|
||
)
|
||
|
||
// ExperimentConfig holds the full experiment timing configuration.
|
||
// Used by the built-in experiment runner (--run-experiment flag).
|
||
type ExperimentConfig struct {
|
||
BaselineDuration time.Duration
|
||
ChaosDuration time.Duration
|
||
CooldownDuration time.Duration
|
||
Scenarios []string // if empty, all scenarios are used
|
||
}
|
||
|
||
type Worker struct {
|
||
Name string
|
||
Host string
|
||
ID string
|
||
}
|
||
|
||
type TransferJob struct {
|
||
Description string `json:"description"`
|
||
Schedule string `json:"schedule"`
|
||
DestinationSystem string `json:"destination_system"`
|
||
DestinationShare string `json:"destination_share"`
|
||
SourceFileURIs []string `json:"source_file_uris"`
|
||
WaitForPublish bool `json:"wait_for_publish"`
|
||
FlatFileModeDisabled bool `json:"flat_file_mode_disabled"`
|
||
}
|
||
|
||
type MFTJobResponse struct {
|
||
ID int `json:"id"`
|
||
JobID string `json:"job_id"`
|
||
StatusCode int `json:"status_code"`
|
||
StatusMessage string `json:"status_message"`
|
||
StatusDetail string `json:"status_detail"`
|
||
}
|
||
|
||
type JobRequest struct {
|
||
SourceWorker Worker
|
||
DestWorker Worker
|
||
Schedule string
|
||
FileURI string
|
||
FileURIs []string
|
||
Description string
|
||
WorkloadType string
|
||
IsAnomaly bool
|
||
}
|
||
|
||
type WorkloadFilter string
|
||
|
||
const (
|
||
WorkloadAll WorkloadFilter = "all"
|
||
WorkloadHighBW WorkloadFilter = "high-bw"
|
||
WorkloadHighIOPS WorkloadFilter = "high-iops"
|
||
WorkloadInterference WorkloadFilter = "interference"
|
||
WorkloadBatchOut WorkloadFilter = "batch-out"
|
||
WorkloadBatchIn WorkloadFilter = "batch-in"
|
||
WorkloadIdle WorkloadFilter = "idle"
|
||
)
|
||
|
||
type Config struct {
|
||
Mode ScenarioMode
|
||
MonitoredWorker string
|
||
EnableBurst bool
|
||
EnableMaintenance bool
|
||
AnomalyRate float64
|
||
RunExperiment bool
|
||
Experiment ExperimentConfig
|
||
WorkloadFilter WorkloadFilter
|
||
}
|
||
|
||
type ScenarioGenerator struct {
|
||
workers []Worker
|
||
config Config
|
||
logWriter *csv.Writer
|
||
logFile *os.File
|
||
logMutex sync.Mutex
|
||
httpClient *http.Client
|
||
rand *rand.Rand
|
||
randMutex sync.Mutex
|
||
jobQueue chan JobRequest
|
||
wg sync.WaitGroup
|
||
sm *ScenarioManager
|
||
}
|
||
|
||
func NewScenarioGenerator(config Config, inv *Inventory) (*ScenarioGenerator, error) {
|
||
var workers []Worker
|
||
for i := range 3 {
|
||
id := fmt.Sprintf("thesis-worker-%d", i)
|
||
host := id
|
||
if inv != nil {
|
||
if h, ok := inv.Workers[id]; ok {
|
||
if h.IP != "" {
|
||
host = h.IP
|
||
}
|
||
}
|
||
}
|
||
workers = append(workers, Worker{
|
||
Name: fmt.Sprintf("Worker Node %d", i),
|
||
Host: host,
|
||
ID: id,
|
||
})
|
||
}
|
||
|
||
logFile, err := os.OpenFile(LogFile, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0o644)
|
||
if err != nil {
|
||
return nil, fmt.Errorf("kann Log-Datei nicht öffnen: %w", err)
|
||
}
|
||
|
||
stat, _ := logFile.Stat()
|
||
writer := csv.NewWriter(logFile)
|
||
if stat.Size() == 0 {
|
||
if err := writer.Write([]string{
|
||
"timestamp", "action", "source", "target", "workload_type", "is_anomaly", "scenario_mode",
|
||
}); err != nil {
|
||
logFile.Close()
|
||
return nil, fmt.Errorf("kann Header nicht schreiben: %w", err)
|
||
}
|
||
writer.Flush()
|
||
}
|
||
|
||
return &ScenarioGenerator{
|
||
workers: workers,
|
||
config: config,
|
||
logWriter: writer,
|
||
logFile: logFile,
|
||
httpClient: &http.Client{Timeout: 30 * time.Second},
|
||
rand: rand.New(rand.NewSource(time.Now().UnixNano())),
|
||
jobQueue: make(chan JobRequest, 100),
|
||
sm: nil,
|
||
}, nil
|
||
}
|
||
|
||
func (sg *ScenarioGenerator) Close() error {
|
||
sg.logMutex.Lock()
|
||
defer sg.logMutex.Unlock()
|
||
sg.logWriter.Flush()
|
||
return sg.logFile.Close()
|
||
}
|
||
|
||
func (sg *ScenarioGenerator) logGroundTruth(action, source, target, workloadType string, isAnomaly bool) {
|
||
sg.logMutex.Lock()
|
||
defer sg.logMutex.Unlock()
|
||
|
||
timestamp := time.Now().Format("2006-01-02 15:04:05")
|
||
anomalyStr := "0"
|
||
if isAnomaly {
|
||
anomalyStr = "1"
|
||
}
|
||
|
||
if err := sg.logWriter.Write([]string{
|
||
timestamp, action, source, target, workloadType, anomalyStr, string(sg.config.Mode),
|
||
}); err != nil {
|
||
log.Printf("Fehler beim Schreiben in Log: %v", err)
|
||
return
|
||
}
|
||
sg.logWriter.Flush()
|
||
|
||
anomalyMarker := ""
|
||
if isAnomaly {
|
||
anomalyMarker = " [ANOMALY]"
|
||
}
|
||
log.Printf("[GT] %s | %s: %s -> %s (%s)%s", timestamp, action, source, target, workloadType, anomalyMarker)
|
||
}
|
||
|
||
func (sg *ScenarioGenerator) cleanupRemote() {
|
||
monitoredHost := sg.config.MonitoredWorker
|
||
log.Printf("Bereinige Incoming-Ordner auf %s (via SSH)...", monitoredHost)
|
||
|
||
cmd := fmt.Sprintf("sudo find %s -type f -mmin +10 -delete", IncomingDir)
|
||
client, ok := sg.sm.SSHManager[monitoredHost]
|
||
if !ok {
|
||
log.Printf("Fehler: Kein SSH-Client für %s gefunden", monitoredHost)
|
||
return
|
||
}
|
||
|
||
out, err := client.RunCommand(cmd)
|
||
if err != nil {
|
||
log.Printf("Remote Cleanup fehlgeschlagen: %v (Output: %s)", err, out)
|
||
return
|
||
}
|
||
log.Println("Remote Cleanup erfolgreich abgeschlossen.")
|
||
}
|
||
|
||
func (sg *ScenarioGenerator) waitForAPI(ctx context.Context, worker Worker) error {
|
||
apiURL := fmt.Sprintf("http://%s:%s/transfer-job-manager/v1/jobs", worker.Host, Port)
|
||
log.Printf("Prüfe Verfügbarkeit von %s (%s)...", worker.Name, worker.Host)
|
||
|
||
ticker := time.NewTicker(10 * time.Second)
|
||
defer ticker.Stop()
|
||
|
||
for {
|
||
select {
|
||
case <-ctx.Done():
|
||
return ctx.Err()
|
||
case <-ticker.C:
|
||
req, err := http.NewRequestWithContext(ctx, "GET", apiURL, nil)
|
||
if err != nil {
|
||
continue
|
||
}
|
||
req.Header.Set("Authorization", AuthHeader)
|
||
|
||
resp, err := sg.httpClient.Do(req)
|
||
if err == nil && resp.StatusCode == 200 {
|
||
resp.Body.Close()
|
||
log.Printf("%s ist ONLINE", worker.Name)
|
||
return nil
|
||
}
|
||
if resp != nil {
|
||
resp.Body.Close()
|
||
}
|
||
log.Printf("Warte auf %s... Retry in 10s", worker.Name)
|
||
}
|
||
}
|
||
}
|
||
|
||
func (sg *ScenarioGenerator) triggerJobWithRetry(ctx context.Context, req JobRequest) (string, error) {
|
||
const maxServiceRecoveryAttempts = 2
|
||
|
||
for recoveryAttempt := range maxServiceRecoveryAttempts + 1 {
|
||
jobID, err := sg.tryTriggerJob(ctx, req)
|
||
if err == nil {
|
||
return jobID, nil
|
||
}
|
||
|
||
if recoveryAttempt >= maxServiceRecoveryAttempts {
|
||
return "", fmt.Errorf("job failed after service recovery: %w", err)
|
||
}
|
||
|
||
log.Printf("[TRIGGER] Alle %d Versuche fehlgeschlagen, starte Health-Check auf %s...",
|
||
MaxRetries, req.SourceWorker.Host)
|
||
sg.checkAndRestartWorkerServices(req.SourceWorker)
|
||
}
|
||
|
||
return "", fmt.Errorf("job trigger fehlgeschlagen nach Service-Recovery")
|
||
}
|
||
|
||
// tryTriggerJob fuehrt MaxRetries POST-Versuche durch ohne Service-Recovery.
|
||
func (sg *ScenarioGenerator) tryTriggerJob(ctx context.Context, req JobRequest) (string, error) {
|
||
var lastErr error
|
||
|
||
for attempt := range MaxRetries {
|
||
if attempt > 0 {
|
||
select {
|
||
case <-ctx.Done():
|
||
return "", ctx.Err()
|
||
case <-time.After(RetryBackoff * time.Duration(attempt)):
|
||
}
|
||
}
|
||
|
||
sourceFiles := req.FileURIs
|
||
if len(sourceFiles) == 0 && req.FileURI != "" {
|
||
sourceFiles = []string{req.FileURI}
|
||
}
|
||
|
||
job := TransferJob{
|
||
Description: req.Description,
|
||
Schedule: "NOW",
|
||
DestinationSystem: req.DestWorker.Name,
|
||
DestinationShare: "local_sync_destination",
|
||
SourceFileURIs: sourceFiles,
|
||
WaitForPublish: false,
|
||
FlatFileModeDisabled: false,
|
||
}
|
||
|
||
jsonData, err := json.Marshal(job)
|
||
if err != nil {
|
||
return "", fmt.Errorf("marshal error: %w", err)
|
||
}
|
||
|
||
apiURL := fmt.Sprintf("http://%s:%s/transfer-job-manager/v1/jobs", req.SourceWorker.Host, Port)
|
||
httpReq, err := http.NewRequestWithContext(ctx, "POST", apiURL, bytes.NewBuffer(jsonData))
|
||
if err != nil {
|
||
return "", fmt.Errorf("request creation error: %w", err)
|
||
}
|
||
httpReq.Header.Set("Authorization", AuthHeader)
|
||
httpReq.Header.Set("Content-Type", "application/json")
|
||
|
||
resp, err := sg.httpClient.Do(httpReq)
|
||
if err != nil {
|
||
lastErr = err
|
||
log.Printf("Fehler bei Job-Trigger (Versuch %d/%d): %v", attempt+1, MaxRetries, err)
|
||
continue
|
||
}
|
||
|
||
if resp.StatusCode >= 200 && resp.StatusCode < 300 {
|
||
var result MFTJobResponse
|
||
if err := json.NewDecoder(resp.Body).Decode(&result); err != nil {
|
||
resp.Body.Close()
|
||
return "", fmt.Errorf("response decode error: %w", err)
|
||
}
|
||
resp.Body.Close()
|
||
log.Printf("Job submitted: id=%d job_id=%s", result.ID, result.JobID)
|
||
return result.JobID, nil
|
||
}
|
||
|
||
io.Copy(io.Discard, resp.Body)
|
||
resp.Body.Close()
|
||
lastErr = fmt.Errorf("HTTP %d", resp.StatusCode)
|
||
log.Printf("Job fehlgeschlagen mit Status %d (Versuch %d/%d)", resp.StatusCode, attempt+1, MaxRetries)
|
||
}
|
||
|
||
return "", fmt.Errorf("job failed after %d attempts: %w", MaxRetries, lastErr)
|
||
}
|
||
|
||
func (sg *ScenarioGenerator) fetchJobStatus(ctx context.Context, sourceWorker Worker, jobIDStr string) (*MFTJobResponse, error) {
|
||
encodedJobID := url.PathEscape(jobIDStr)
|
||
apiURL := fmt.Sprintf("http://%s:%s/transfer-job-manager/v1/jobs/%s",
|
||
sourceWorker.Host, Port, encodedJobID)
|
||
|
||
httpReq, err := http.NewRequestWithContext(ctx, "GET", apiURL, nil)
|
||
if err != nil {
|
||
return nil, err
|
||
}
|
||
httpReq.Header.Set("Authorization", AuthHeader)
|
||
|
||
resp, err := sg.httpClient.Do(httpReq)
|
||
if err != nil {
|
||
return nil, err
|
||
}
|
||
defer resp.Body.Close()
|
||
|
||
if resp.StatusCode != 200 {
|
||
return nil, fmt.Errorf("HTTP %d für job %s", resp.StatusCode, jobIDStr)
|
||
}
|
||
|
||
var result MFTJobResponse
|
||
if err := json.NewDecoder(resp.Body).Decode(&result); err != nil {
|
||
return nil, fmt.Errorf("decode error: %w", err)
|
||
}
|
||
return &result, nil
|
||
}
|
||
|
||
const MFTServiceRestartThreshold = 3
|
||
|
||
const (
|
||
serviceTransferJobManager = "transfer-job-manager"
|
||
serviceTixstream = "tixstream"
|
||
)
|
||
|
||
func (sg *ScenarioGenerator) checkAndRestartWorkerServices(worker Worker) bool {
|
||
log.Printf("[HEALTH] Pruefe Services auf %s via SSH...", worker.Host)
|
||
|
||
hostCfg, ok := sg.sm.Inventory.Workers[worker.ID]
|
||
if !ok {
|
||
log.Printf("[HEALTH] Worker %s nicht in Inventory gefunden", worker.ID)
|
||
return false
|
||
}
|
||
|
||
client, err := NewSSHClient(hostCfg)
|
||
if err != nil {
|
||
log.Printf("[HEALTH] SSH-Verbindung zu %s fehlgeschlagen: %v", worker.Host, err)
|
||
return false
|
||
}
|
||
defer client.Close()
|
||
|
||
restarted := false
|
||
|
||
for _, svc := range []string{serviceTransferJobManager, serviceTixstream} {
|
||
out, err := client.RunCommand(fmt.Sprintf("systemctl is-active %s", svc))
|
||
status := strings.TrimSpace(out)
|
||
|
||
if err != nil || status != "active" {
|
||
log.Printf("[HEALTH] Service %s auf %s ist '%s' - starte neu...", svc, worker.Host, status)
|
||
sg.logGroundTruth("MFT_SERVICE_RESTART", worker.Host, svc, "N/A", false)
|
||
|
||
restartOut, restartErr := client.RunCommand(fmt.Sprintf("sudo systemctl restart %s", svc))
|
||
if restartErr != nil {
|
||
log.Printf("[HEALTH] Restart von %s auf %s fehlgeschlagen: %v (output: %s)",
|
||
svc, worker.Host, restartErr, restartOut)
|
||
continue
|
||
}
|
||
|
||
log.Printf("[HEALTH] %s auf %s erfolgreich neugestartet", svc, worker.Host)
|
||
restarted = true
|
||
} else {
|
||
log.Printf("[HEALTH] Service %s auf %s: OK (active)", svc, worker.Host)
|
||
}
|
||
}
|
||
|
||
if restarted {
|
||
log.Printf("[HEALTH] Warte 15s auf Service-Startup auf %s...", worker.Host)
|
||
time.Sleep(15 * time.Second)
|
||
}
|
||
|
||
return restarted
|
||
}
|
||
|
||
func (sg *ScenarioGenerator) waitForJobCompletion(ctx context.Context, sourceWorker Worker, jobIDStr string) error {
|
||
ticker := time.NewTicker(5 * time.Second)
|
||
defer ticker.Stop()
|
||
timeout := time.After(JobCompletionTimeout)
|
||
|
||
log.Printf("Warte auf Job-Abschluss: %s", jobIDStr)
|
||
|
||
consecutiveErrors := 0
|
||
|
||
for {
|
||
select {
|
||
case <-ctx.Done():
|
||
return ctx.Err()
|
||
case <-timeout:
|
||
return fmt.Errorf("job %s timed out nach %s", jobIDStr, JobCompletionTimeout)
|
||
case <-ticker.C:
|
||
result, err := sg.fetchJobStatus(ctx, sourceWorker, jobIDStr)
|
||
if err != nil {
|
||
consecutiveErrors++
|
||
log.Printf("Polling-Fehler fuer Job %s (%d/%d): %v",
|
||
jobIDStr, consecutiveErrors, MFTServiceRestartThreshold, err)
|
||
|
||
if consecutiveErrors >= MFTServiceRestartThreshold {
|
||
sg.checkAndRestartWorkerServices(sourceWorker)
|
||
consecutiveErrors = 0
|
||
}
|
||
continue
|
||
}
|
||
|
||
consecutiveErrors = 0
|
||
|
||
log.Printf("Job %s: %s – %s (%d)",
|
||
jobIDStr, result.StatusMessage, result.StatusDetail, result.StatusCode)
|
||
|
||
switch {
|
||
case result.StatusCode == MFTStatusCompleted:
|
||
log.Printf("Job abgeschlossen: %s", jobIDStr)
|
||
return nil
|
||
case result.StatusCode == 880:
|
||
log.Printf("Job %s: temporaerer Fehler (880), warte auf Auto-Retry...", jobIDStr)
|
||
case result.StatusCode >= MFTStatusFailed:
|
||
return fmt.Errorf("job %s fehlgeschlagen (terminal): %s – %s",
|
||
jobIDStr, result.StatusMessage, result.StatusDetail)
|
||
}
|
||
}
|
||
}
|
||
}
|
||
|
||
func (sg *ScenarioGenerator) triggerAndWait(ctx context.Context, req JobRequest) error {
|
||
jobIDStr, err := sg.triggerJobWithRetry(ctx, req)
|
||
if err != nil {
|
||
return err
|
||
}
|
||
return sg.waitForJobCompletion(ctx, req.SourceWorker, jobIDStr)
|
||
}
|
||
|
||
func (sg *ScenarioGenerator) triggerJobOverSSHWithRetry(ctx context.Context, req JobRequest) error {
|
||
var lastErr error
|
||
|
||
for attempt := range MaxRetries {
|
||
if attempt > 0 {
|
||
select {
|
||
case <-ctx.Done():
|
||
return ctx.Err()
|
||
case <-time.After(RetryBackoff * time.Duration(1<<attempt)):
|
||
}
|
||
}
|
||
|
||
client, ok := sg.sm.SSHManager[req.SourceWorker.ID]
|
||
if !ok {
|
||
lastErr = fmt.Errorf("kein SSH-Client für %s gefunden", req.SourceWorker.ID)
|
||
continue
|
||
}
|
||
|
||
sourceFiles := req.FileURIs
|
||
if len(sourceFiles) == 0 && req.FileURI != "" {
|
||
sourceFiles = []string{req.FileURI}
|
||
}
|
||
|
||
for _, file := range sourceFiles {
|
||
cmd := fmt.Sprintf(
|
||
"/opt/tixel/tixstream/bin/tixstream-client start --address %s:59999 --peer %s:60002 --source local_sync_source --destination local_sync_destination --dentry %s",
|
||
req.SourceWorker.ID, req.DestWorker.ID, file,
|
||
)
|
||
out, err := client.RunCommand(cmd)
|
||
if err != nil {
|
||
lastErr = fmt.Errorf("SSH command failed: %w (output: %s)", err, out)
|
||
break
|
||
}
|
||
log.Printf("SSH Transfer gestartet: %s", file)
|
||
}
|
||
return nil
|
||
}
|
||
|
||
return fmt.Errorf("SSH job failed after %d attempts: %w", MaxRetries, lastErr)
|
||
}
|
||
|
||
func (sg *ScenarioGenerator) startWorkerPool(ctx context.Context) {
|
||
for i := range WorkerPoolSize {
|
||
sg.wg.Add(1)
|
||
go func(workerID int) {
|
||
defer sg.wg.Done()
|
||
for {
|
||
select {
|
||
case <-ctx.Done():
|
||
return
|
||
case req, ok := <-sg.jobQueue:
|
||
if !ok {
|
||
return
|
||
}
|
||
sg.logGroundTruth("START_TRANSFER", req.SourceWorker.Host, req.DestWorker.Host, req.WorkloadType, req.IsAnomaly)
|
||
if err := sg.triggerAndWait(ctx, req); err != nil {
|
||
log.Printf("Worker %d: Job fehlgeschlagen: %v", workerID, err)
|
||
sg.logGroundTruth("TRANSFER_FAILED", req.SourceWorker.Host, req.DestWorker.Host, req.WorkloadType, req.IsAnomaly)
|
||
}
|
||
}
|
||
}
|
||
}(i)
|
||
}
|
||
}
|
||
|
||
func (sg *ScenarioGenerator) intn(n int) int {
|
||
sg.randMutex.Lock()
|
||
defer sg.randMutex.Unlock()
|
||
return sg.rand.Intn(n)
|
||
}
|
||
|
||
func (sg *ScenarioGenerator) float64n() float64 {
|
||
sg.randMutex.Lock()
|
||
defer sg.randMutex.Unlock()
|
||
return sg.rand.Float64()
|
||
}
|
||
|
||
func (sg *ScenarioGenerator) getMonitoredWorker() Worker {
|
||
for _, w := range sg.workers {
|
||
if w.ID == sg.config.MonitoredWorker {
|
||
return w
|
||
}
|
||
}
|
||
return sg.workers[0]
|
||
}
|
||
|
||
func (sg *ScenarioGenerator) shouldGenerateBurst() bool {
|
||
if !sg.config.EnableBurst || sg.config.Mode == ModeTraining {
|
||
return false
|
||
}
|
||
if sg.config.Mode == ModeTesting {
|
||
return sg.float64n() < sg.config.AnomalyRate*0.5
|
||
}
|
||
return sg.float64n() < 0.2
|
||
}
|
||
|
||
func (sg *ScenarioGenerator) runBurstWorkload(ctx context.Context) {
|
||
log.Println("BURST-TRAFFIC erkannt! [ANOMALY]")
|
||
sg.logGroundTruth("BURST_START", "ALL", "ALL", "BURST", true)
|
||
|
||
W0 := sg.getMonitoredWorker()
|
||
numBurstJobs := 15 + sg.intn(10)
|
||
|
||
for i := range numBurstJobs {
|
||
select {
|
||
case <-ctx.Done():
|
||
return
|
||
default:
|
||
}
|
||
|
||
destIdx := 1 + sg.intn(len(sg.workers)-1)
|
||
dest := sg.workers[destIdx]
|
||
fileNum := sg.intn(10) + 1
|
||
|
||
req := JobRequest{
|
||
SourceWorker: W0,
|
||
DestWorker: dest,
|
||
FileURI: fmt.Sprintf("local_sync_source/small_%d.dat", fileNum),
|
||
Description: fmt.Sprintf("Burst File %d", i+1),
|
||
WorkloadType: "BURST",
|
||
IsAnomaly: true,
|
||
}
|
||
|
||
select {
|
||
case sg.jobQueue <- req:
|
||
case <-ctx.Done():
|
||
return
|
||
}
|
||
|
||
select {
|
||
case <-ctx.Done():
|
||
return
|
||
case <-time.After(time.Duration(200+sg.intn(300)) * time.Millisecond):
|
||
}
|
||
}
|
||
|
||
sg.logGroundTruth("BURST_END", "ALL", "ALL", "BURST", true)
|
||
}
|
||
|
||
func (sg *ScenarioGenerator) runMaintenanceWorkload(ctx context.Context) {
|
||
if !sg.config.EnableMaintenance {
|
||
return
|
||
}
|
||
|
||
isAnomaly := sg.config.Mode != ModeTraining
|
||
log.Println("WARTUNGSFENSTER - Backup")
|
||
sg.logGroundTruth("MAINTENANCE_START", "ALL", "ALL", "MAINTENANCE", isAnomaly)
|
||
|
||
W0 := sg.getMonitoredWorker()
|
||
for i := range 3 {
|
||
select {
|
||
case <-ctx.Done():
|
||
return
|
||
default:
|
||
}
|
||
|
||
dest := sg.workers[1+(i%(len(sg.workers)-1))]
|
||
req := JobRequest{
|
||
SourceWorker: W0,
|
||
DestWorker: dest,
|
||
FileURI: "local_sync_source/5g.mxf",
|
||
Description: fmt.Sprintf("Maintenance Backup %d", i+1),
|
||
WorkloadType: "MAINTENANCE_BACKUP",
|
||
IsAnomaly: isAnomaly,
|
||
}
|
||
|
||
if err := sg.triggerAndWait(ctx, req); err != nil {
|
||
log.Printf("Maintenance Job fehlgeschlagen: %v", err)
|
||
}
|
||
}
|
||
|
||
sg.logGroundTruth("MAINTENANCE_END", "ALL", "ALL", "MAINTENANCE", isAnomaly)
|
||
}
|
||
|
||
func localSyncFiles(names ...string) []string {
|
||
result := make([]string, len(names))
|
||
for i, name := range names {
|
||
result[i] = "local_sync_source/" + name
|
||
}
|
||
return result
|
||
}
|
||
|
||
func localSyncSeries(prefix string, from, to int) []string {
|
||
result := make([]string, 0, to-from+1)
|
||
for i := from; i <= to; i++ {
|
||
result = append(result, fmt.Sprintf("local_sync_source/%s%d.dat", prefix, i))
|
||
}
|
||
return result
|
||
}
|
||
|
||
func (sg *ScenarioGenerator) Run(ctx context.Context) error {
|
||
for _, worker := range sg.workers {
|
||
if err := sg.waitForAPI(ctx, worker); err != nil {
|
||
return fmt.Errorf("worker %s nicht erreichbar: %w", worker.Name, err)
|
||
}
|
||
}
|
||
|
||
log.Printf("Starte Szenario im %s-Modus", sg.config.Mode)
|
||
log.Printf(" Überwachter Worker: %s", sg.config.MonitoredWorker)
|
||
sg.logGroundTruth("SCENARIO_START", "ALL", "ALL", "START", false)
|
||
|
||
sg.startWorkerPool(ctx)
|
||
|
||
W0 := sg.getMonitoredWorker()
|
||
var W1, W2 Worker
|
||
for _, w := range sg.workers {
|
||
switch w.ID {
|
||
case "thesis-worker-1":
|
||
W1 = w
|
||
case "thesis-worker-2":
|
||
W2 = w
|
||
}
|
||
}
|
||
|
||
initialPhase := 0
|
||
fixedPhase := -1
|
||
|
||
switch sg.config.WorkloadFilter {
|
||
case WorkloadHighBW:
|
||
initialPhase, fixedPhase = 0, 0
|
||
case WorkloadHighIOPS:
|
||
initialPhase, fixedPhase = 1, 1
|
||
case WorkloadInterference:
|
||
initialPhase, fixedPhase = 2, 2
|
||
case WorkloadBatchOut:
|
||
initialPhase, fixedPhase = 3, 3
|
||
case WorkloadBatchIn:
|
||
initialPhase, fixedPhase = 4, 4
|
||
case WorkloadIdle:
|
||
initialPhase, fixedPhase = 5, 5
|
||
}
|
||
|
||
if fixedPhase >= 0 {
|
||
log.Printf("WorkloadFilter aktiv: nur Phase '%s' (phase=%d) wird wiederholt", sg.config.WorkloadFilter, fixedPhase)
|
||
}
|
||
|
||
phase := initialPhase
|
||
|
||
for {
|
||
select {
|
||
case <-ctx.Done():
|
||
log.Println("Shutdown-Signal empfangen")
|
||
return ctx.Err()
|
||
default:
|
||
}
|
||
|
||
if sg.config.EnableMaintenance && sg.config.Mode != ModeTraining && phase == 0 && sg.intn(100) < 5 {
|
||
sg.runMaintenanceWorkload(ctx)
|
||
}
|
||
|
||
switch phase {
|
||
case 0:
|
||
log.Println("PHASE 1: High Bandwidth (W0 -> W1)")
|
||
sg.logGroundTruth("START_PHASE_BW", W0.Host, W1.Host, "HIGH_BW", false)
|
||
|
||
if err := sg.triggerAndWait(ctx, JobRequest{
|
||
SourceWorker: W0,
|
||
DestWorker: W1,
|
||
FileURI: "local_sync_source/5g.mxf",
|
||
Description: "Thesis Phase 1: High BW",
|
||
WorkloadType: "HIGH_BW",
|
||
}); err != nil {
|
||
log.Printf("Phase 1 fehlgeschlagen: %v", err)
|
||
}
|
||
|
||
sg.logGroundTruth("END_PHASE_BW", W0.Host, W1.Host, "HIGH_BW", false)
|
||
if fixedPhase >= 0 {
|
||
phase = 5
|
||
} else {
|
||
phase = 1
|
||
}
|
||
|
||
case 1:
|
||
log.Println("PHASE 2: High IOPS (W0 -> W2)")
|
||
sg.logGroundTruth("START_PHASE_IOPS", W0.Host, W2.Host, "HIGH_IOPS", false)
|
||
|
||
var iopsWg sync.WaitGroup
|
||
for i := 1; i <= 20; i++ {
|
||
select {
|
||
case <-ctx.Done():
|
||
return ctx.Err()
|
||
default:
|
||
}
|
||
|
||
iopsWg.Add(1)
|
||
go func(fileIdx int) {
|
||
defer iopsWg.Done()
|
||
if err := sg.triggerAndWait(ctx, JobRequest{
|
||
SourceWorker: W0,
|
||
DestWorker: W2,
|
||
FileURI: fmt.Sprintf("local_sync_source/small_%d.dat", (fileIdx%10)+1),
|
||
Description: fmt.Sprintf("Thesis Phase 2: File %d", fileIdx),
|
||
WorkloadType: "HIGH_IOPS",
|
||
}); err != nil {
|
||
log.Printf("IOPS Job %d fehlgeschlagen: %v", fileIdx, err)
|
||
}
|
||
}(i)
|
||
|
||
time.Sleep(200 * time.Millisecond)
|
||
}
|
||
|
||
iopsWg.Wait()
|
||
|
||
sg.logGroundTruth("END_PHASE_IOPS", W0.Host, W2.Host, "HIGH_IOPS", false)
|
||
if fixedPhase >= 0 {
|
||
phase = 5
|
||
} else {
|
||
phase = 2
|
||
}
|
||
|
||
case 2:
|
||
log.Println("PHASE 3: Interferenz (W1 -> W0)")
|
||
sg.logGroundTruth("START_PHASE_INTERFERENCE", W1.Host, W0.Host, "INTERFERENCE", false)
|
||
|
||
if err := sg.triggerAndWait(ctx, JobRequest{
|
||
SourceWorker: W1,
|
||
DestWorker: W0,
|
||
FileURI: "local_sync_source/1g.mxf",
|
||
Description: "Thesis Phase 3: Interference",
|
||
WorkloadType: "INTERFERENCE",
|
||
}); err != nil {
|
||
log.Printf("Phase 3 fehlgeschlagen: %v", err)
|
||
}
|
||
|
||
sg.cleanupRemote()
|
||
sg.logGroundTruth("END_PHASE_INTERFERENCE", W1.Host, W0.Host, "INTERFERENCE", false)
|
||
if fixedPhase >= 0 {
|
||
phase = 5
|
||
} else {
|
||
phase = 3
|
||
}
|
||
|
||
case 3:
|
||
log.Println("PHASE 4: Batch Transfer OUT (W0 -> W2)")
|
||
sg.logGroundTruth("START_PHASE_BATCH_OUT", W0.Host, W2.Host, "BATCH_IOPS_OUT", false)
|
||
|
||
if err := sg.triggerAndWait(ctx, JobRequest{
|
||
SourceWorker: W0,
|
||
DestWorker: W2,
|
||
FileURIs: localSyncSeries("small_", 1, 20),
|
||
Description: "Thesis Phase 4: Bulk Transfer Out",
|
||
WorkloadType: "BATCH_IOPS_OUT",
|
||
}); err != nil {
|
||
log.Printf("Batch Out fehlgeschlagen: %v", err)
|
||
}
|
||
|
||
sg.logGroundTruth("END_PHASE_BATCH_OUT", W0.Host, W2.Host, "BATCH_IOPS_OUT", false)
|
||
if fixedPhase >= 0 {
|
||
phase = 5
|
||
} else {
|
||
phase = 4
|
||
}
|
||
|
||
case 4:
|
||
log.Println("PHASE 5: Batch Transfer IN (W1 -> W0)")
|
||
sg.logGroundTruth("START_PHASE_BATCH_IN", W1.Host, W0.Host, "BATCH_IOPS_IN", false)
|
||
|
||
if err := sg.triggerAndWait(ctx, JobRequest{
|
||
SourceWorker: W1,
|
||
DestWorker: W0,
|
||
FileURIs: localSyncSeries("small_", 1, 20),
|
||
Description: "Thesis Phase 5: Bulk Transfer In",
|
||
WorkloadType: "BATCH_IOPS_IN",
|
||
}); err != nil {
|
||
log.Printf("Batch In fehlgeschlagen: %v", err)
|
||
}
|
||
|
||
sg.cleanupRemote()
|
||
sg.logGroundTruth("END_PHASE_BATCH_IN", W1.Host, W0.Host, "BATCH_IOPS_IN", false)
|
||
if fixedPhase >= 0 {
|
||
phase = 5
|
||
} else {
|
||
phase = 5
|
||
}
|
||
|
||
case 5:
|
||
if sg.config.Mode == ModeTesting && sg.shouldGenerateBurst() {
|
||
sg.runBurstWorkload(ctx)
|
||
} else {
|
||
log.Println("PHASE 6: IDLE (Baseline)")
|
||
sg.logGroundTruth("IDLE", "ALL", "ALL", "IDLE", false)
|
||
select {
|
||
case <-ctx.Done():
|
||
return ctx.Err()
|
||
case <-time.After(60 * time.Second):
|
||
}
|
||
}
|
||
|
||
phase = max(fixedPhase, 0)
|
||
sg.cleanupRemote()
|
||
log.Println("Zyklus abgeschlossen, starte von vorne...")
|
||
}
|
||
}
|
||
}
|
||
|
||
func (sg *ScenarioGenerator) RunExperiment(ctx context.Context) error {
|
||
exp := sg.config.Experiment
|
||
|
||
scenarios := exp.Scenarios
|
||
if len(scenarios) == 0 {
|
||
for _, s := range sg.sm.GetScenarios() {
|
||
scenarios = append(scenarios, s.ID)
|
||
}
|
||
}
|
||
|
||
totalDuration := exp.BaselineDuration + time.Duration(len(scenarios))*(exp.ChaosDuration+exp.CooldownDuration)
|
||
log.Printf("Experiment gestartet. Geschätzte Gesamtdauer: %.1f Stunden", totalDuration.Hours())
|
||
log.Printf(" Baseline: %.0f Min | Chaos/Szenario: %.0f Min | Cooldown: %.0f Min | Szenarien: %d",
|
||
exp.BaselineDuration.Minutes(), exp.ChaosDuration.Minutes(), exp.CooldownDuration.Minutes(), len(scenarios))
|
||
|
||
log.Printf(" PHASE 1: BASELINE (%.0f Min)", exp.BaselineDuration.Minutes())
|
||
|
||
sg.logGroundTruth("EXPERIMENT_BASELINE_START", "ALL", "ALL", "EXPERIMENT", false)
|
||
|
||
baselineCtx, baselineCancel := context.WithTimeout(ctx, exp.BaselineDuration)
|
||
|
||
if err := sg.Run(baselineCtx); err != nil && err != context.DeadlineExceeded && err != context.Canceled {
|
||
baselineCancel()
|
||
return fmt.Errorf("baseline fehlgeschlagen: %w", err)
|
||
}
|
||
baselineCancel()
|
||
|
||
sg.logGroundTruth("EXPERIMENT_BASELINE_END", "ALL", "ALL", "EXPERIMENT", false)
|
||
log.Println("Baseline abgeschlossen. 10s Pause vor Chaos-Phase...")
|
||
select {
|
||
case <-ctx.Done():
|
||
return ctx.Err()
|
||
case <-time.After(10 * time.Second):
|
||
}
|
||
|
||
log.Printf(" PHASE 2: CHAOS INJEKTION (%d Szenarien)", len(scenarios))
|
||
|
||
chaosWorkloadCtx, chaosWorkloadCancel := context.WithCancel(ctx)
|
||
defer chaosWorkloadCancel()
|
||
|
||
workloadDone := make(chan error, 1)
|
||
go func() {
|
||
workloadDone <- sg.Run(chaosWorkloadCtx)
|
||
}()
|
||
|
||
for i, scenarioID := range scenarios {
|
||
select {
|
||
case <-ctx.Done():
|
||
sg.sm.StopScenario()
|
||
chaosWorkloadCancel()
|
||
<-workloadDone
|
||
return ctx.Err()
|
||
default:
|
||
}
|
||
|
||
log.Printf("[%d/%d] CHAOS START: %s", i+1, len(scenarios), scenarioID)
|
||
sg.logGroundTruth("EXPERIMENT_CHAOS_START", "Orchestrator", "System", scenarioID, true)
|
||
|
||
if err := sg.sm.StartScenario(scenarioID); err != nil {
|
||
log.Printf("Fehler beim Starten von %s: %v – überspringe", scenarioID, err)
|
||
continue
|
||
}
|
||
|
||
select {
|
||
case <-ctx.Done():
|
||
sg.sm.StopScenario()
|
||
chaosWorkloadCancel()
|
||
<-workloadDone
|
||
return ctx.Err()
|
||
case <-time.After(exp.ChaosDuration):
|
||
}
|
||
|
||
sg.sm.StopScenario()
|
||
sg.logGroundTruth("EXPERIMENT_CHAOS_END", "Orchestrator", "System", scenarioID, true)
|
||
log.Printf("[%d/%d] CHAOS STOP: %s | Cooldown: %.0f Min", i+1, len(scenarios), scenarioID, exp.CooldownDuration.Minutes())
|
||
|
||
select {
|
||
case <-ctx.Done():
|
||
chaosWorkloadCancel()
|
||
<-workloadDone
|
||
return ctx.Err()
|
||
case <-time.After(exp.CooldownDuration):
|
||
}
|
||
}
|
||
|
||
chaosWorkloadCancel()
|
||
<-workloadDone
|
||
|
||
log.Println(" EXPERIMENT ABGESCHLOSSEN")
|
||
sg.logGroundTruth("EXPERIMENT_END", "ALL", "ALL", "EXPERIMENT", false)
|
||
|
||
return nil
|
||
}
|
||
|
||
func (sg *ScenarioGenerator) startAPIServer(ctx context.Context, cancelFunc context.CancelFunc) {
|
||
mux := http.NewServeMux()
|
||
|
||
mux.HandleFunc("/scenarios", func(w http.ResponseWriter, r *http.Request) {
|
||
if r.Method != http.MethodGet {
|
||
http.Error(w, "Method not allowed", http.StatusMethodNotAllowed)
|
||
return
|
||
}
|
||
scenarios := sg.sm.GetScenarios()
|
||
w.Header().Set("Content-Type", "application/json")
|
||
json.NewEncoder(w).Encode(scenarios)
|
||
})
|
||
|
||
mux.HandleFunc("/scenarios/start", func(w http.ResponseWriter, r *http.Request) {
|
||
if r.Method != http.MethodPost {
|
||
http.Error(w, "Method not allowed", http.StatusMethodNotAllowed)
|
||
return
|
||
}
|
||
id := r.URL.Query().Get("id")
|
||
if id == "" {
|
||
http.Error(w, "Missing id parameter", http.StatusBadRequest)
|
||
return
|
||
}
|
||
if err := sg.sm.StartScenario(id); err != nil {
|
||
http.Error(w, err.Error(), http.StatusInternalServerError)
|
||
return
|
||
}
|
||
fmt.Fprintf(w, "Scenario %s started", id)
|
||
})
|
||
|
||
mux.HandleFunc("/scenarios/stop", func(w http.ResponseWriter, r *http.Request) {
|
||
if r.Method != http.MethodPost {
|
||
http.Error(w, "Method not allowed", http.StatusMethodNotAllowed)
|
||
return
|
||
}
|
||
sg.sm.StopScenario()
|
||
fmt.Fprintf(w, "Scenario stopped")
|
||
})
|
||
|
||
server := &http.Server{Addr: ":" + APIPort, Handler: mux}
|
||
|
||
go func() {
|
||
log.Printf("API Server gestartet auf Port %s", APIPort)
|
||
if err := server.ListenAndServe(); err != nil && err != http.ErrServerClosed {
|
||
log.Printf("API Server Fehler: %v", err)
|
||
cancelFunc()
|
||
}
|
||
}()
|
||
|
||
go func() {
|
||
<-ctx.Done()
|
||
shutdownCtx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
|
||
defer cancel()
|
||
server.Shutdown(shutdownCtx)
|
||
}()
|
||
}
|
||
|
||
func main() {
|
||
log.SetFlags(log.LstdFlags | log.Lshortfile)
|
||
|
||
mode := flag.String("mode", "training", "Szenario-Modus: training, testing, anomaly-only")
|
||
worker := flag.String("worker", "thesis-worker-0", "Überwachter Worker")
|
||
burst := flag.Bool("burst", true, "Burst-Traffic aktivieren")
|
||
maintenance := flag.Bool("maintenance", true, "Wartungsfenster aktivieren")
|
||
anomalyRate := flag.Float64("anomaly-rate", 0.1, "Anomalie-Rate im Testing-Mode")
|
||
inventoryPath := flag.String("inventory", "inventory.ini", "Pfad zur Ansible inventory.ini")
|
||
|
||
runExperiment := flag.Bool("run-experiment", false, "Vollständiges Experiment ausführen (Baseline + Chaos)")
|
||
baselineHours := flag.Float64("baseline-hours", 6, "Dauer der Baseline-Phase in Stunden")
|
||
chaosMins := flag.Float64("chaos-mins", 20, "Dauer jedes aktiven Chaos-Szenarios in Minuten")
|
||
cooldownMins := flag.Float64("cooldown-mins", 20, "Cooldown-Dauer zwischen Szenarien in Minuten")
|
||
workload := flag.String("workload", "all", "Workload-Filter: all | high-bw | high-iops | interference | batch-out | batch-in | idle")
|
||
|
||
flag.Parse()
|
||
|
||
config := Config{
|
||
Mode: ScenarioMode(*mode),
|
||
MonitoredWorker: *worker,
|
||
EnableBurst: *burst,
|
||
EnableMaintenance: *maintenance,
|
||
AnomalyRate: *anomalyRate,
|
||
RunExperiment: *runExperiment,
|
||
WorkloadFilter: WorkloadFilter(*workload),
|
||
Experiment: ExperimentConfig{
|
||
BaselineDuration: time.Duration(*baselineHours * float64(time.Hour)),
|
||
ChaosDuration: time.Duration(*chaosMins * float64(time.Minute)),
|
||
CooldownDuration: time.Duration(*cooldownMins * float64(time.Minute)),
|
||
},
|
||
}
|
||
|
||
if config.Mode != ModeTraining && config.Mode != ModeTesting && config.Mode != ModeAnomalyOnly {
|
||
log.Fatalf("Ungültiger Modus: %s", config.Mode)
|
||
}
|
||
|
||
validWorkloads := map[WorkloadFilter]bool{
|
||
WorkloadAll: true, WorkloadHighBW: true, WorkloadHighIOPS: true,
|
||
WorkloadInterference: true, WorkloadBatchOut: true, WorkloadBatchIn: true,
|
||
WorkloadIdle: true,
|
||
}
|
||
if !validWorkloads[config.WorkloadFilter] {
|
||
log.Fatalf("Ungültiger Workload-Filter: %s (erlaubt: all, high-bw, high-iops, interference, batch-out, batch-in, idle)", config.WorkloadFilter)
|
||
}
|
||
|
||
log.Println("Thesis Scenario Generator gestartet")
|
||
log.Printf(" Modus: %s | Experiment: %v | Workload: %s", config.Mode, config.RunExperiment, config.WorkloadFilter)
|
||
|
||
inv, err := ParseInventory(*inventoryPath)
|
||
if err != nil {
|
||
log.Fatalf("Fehler beim Parsen des Inventory: %v", err)
|
||
}
|
||
|
||
generator, err := NewScenarioGenerator(config, inv)
|
||
if err != nil {
|
||
log.Fatalf("Fehler beim Initialisieren des Generators: %v", err)
|
||
}
|
||
|
||
sm, err := NewScenarioManager(inv, generator.logGroundTruth)
|
||
if err != nil {
|
||
log.Fatalf("Fehler beim Initialisieren des Scenario Managers: %v", err)
|
||
}
|
||
generator.sm = sm
|
||
|
||
defer func() {
|
||
if err := generator.Close(); err != nil {
|
||
log.Printf("Fehler beim Schließen: %v", err)
|
||
}
|
||
sm.StopScenario()
|
||
}()
|
||
|
||
ctx, cancel := context.WithCancel(context.Background())
|
||
defer cancel()
|
||
|
||
generator.startAPIServer(ctx, cancel)
|
||
|
||
sigChan := make(chan os.Signal, 1)
|
||
signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM)
|
||
|
||
errChan := make(chan error, 1)
|
||
go func() {
|
||
if config.RunExperiment {
|
||
errChan <- generator.RunExperiment(ctx)
|
||
} else {
|
||
errChan <- generator.Run(ctx)
|
||
}
|
||
}()
|
||
|
||
select {
|
||
case sig := <-sigChan:
|
||
log.Printf("Signal %v empfangen, fahre graceful herunter...", sig)
|
||
cancel()
|
||
done := make(chan struct{})
|
||
go func() {
|
||
generator.wg.Wait()
|
||
close(done)
|
||
}()
|
||
select {
|
||
case <-done:
|
||
log.Println("Alle Worker beendet")
|
||
case <-time.After(30 * time.Second):
|
||
log.Println("Shutdown Timeout erreicht")
|
||
}
|
||
|
||
case err := <-errChan:
|
||
if err != nil && err != context.Canceled && err != context.DeadlineExceeded {
|
||
log.Fatalf("Generator-Fehler: %v", err)
|
||
}
|
||
}
|
||
|
||
log.Println("Scenario Generator beendet")
|
||
}
|