package detect import ( "log" "sync" "time" "codeberg.org/pata1704/guenther/pkg/types" ) // ScalingLevel represents the current detector complexity level. type ScalingLevel int const ( LevelNormal ScalingLevel = iota // SEAD Ensemble (full accuracy) LevelHigh // COPOD (reduced complexity) LevelCritical // MAD (minimal overhead) ) // levelName maps ScalingLevel to a human-readable string for logging. var levelName = map[ScalingLevel]string{ LevelNormal: "SEAD Ensemble (Normal)", LevelHigh: "COPOD (High Load)", LevelCritical: "MAD (Critical Load)", } // ── SwitchableDetector ─────────────────────────────────────────────────────── // SwitchableDetector wraps a SEADDetector and allows runtime switching to // lighter-weight sub-detectors (COPOD, MAD) under high CPU load. // // State consistency guarantee: all base detectors are kept up-to-date // regardless of which one is currently active. This ensures a clean // transition back to SEAD without stale internal state. // // Update-deduplication contract: // // SEAD.Score() calls d.Score() on every base detector, which self-updates. // → no separate Update() call needed; doing so would double-count. // SEAD.Update() calls d.Update() on every base detector directly. // → used here when we need to advance inactive detectors // without scoring through SEAD. // // For LevelHigh / LevelCritical we call: // // s.ensemble.Update(vector) → advances MAD, RRCF variants via d.Update() // COPOD.Update() = COPOD.update() (buffer append only) // active.Score(vector) → scores + self-updates the active detector // (COPOD.Score calls update internally again) // // This means COPOD receives one Update() + one self-update from Score() per tick. // That is intentional: Update() appends to the sliding window buffer; Score() // computes the copula and then appends the scored point (score-then-insert). // The two operations are not idempotent and must both run for correct behaviour. // RRCF and MAD are updated via SEAD.Update() only; their Score() methods are // not called when inactive so they do not double-count. type SwitchableDetector struct { mu sync.RWMutex ensemble *SEADDetector copod AnomalyDetector // may be nil if COPOD is not configured mad AnomalyDetector // may be nil if MAD is not configured activeLevel ScalingLevel } // NewSwitchableDetector creates a SwitchableDetector backed by the given // SEADDetector. COPOD and MAD sub-detectors are extracted from the ensemble // for direct access during high-load switching. // // If a sub-detector is not present in the ensemble, the corresponding field // is nil and Score() falls back to the ensemble for that level. func NewSwitchableDetector(ensemble *SEADDetector) *SwitchableDetector { return &SwitchableDetector{ ensemble: ensemble, copod: ensemble.GetDetector("COPOD"), mad: ensemble.GetDetector("MAD"), activeLevel: LevelNormal, } } // Fit trains all underlying detectors on the given baseline vectors. func (s *SwitchableDetector) Fit(vectors []types.FeatureVector) error { return s.ensemble.Fit(vectors) } // Update advances the internal state of all base detectors without scoring. // Safe for concurrent use. func (s *SwitchableDetector) Update(vector types.FeatureVector) error { return s.ensemble.Update(vector) } // Score returns an AnomalyResult from the currently active detector. // // All inactive detectors are kept current via SEAD.Update() so that // switching back to a heavier detector does not produce stale scores. // Safe for concurrent use. func (s *SwitchableDetector) Score(vector types.FeatureVector) (types.AnomalyResult, error) { s.mu.RLock() level := s.activeLevel s.mu.RUnlock() // LevelNormal: SEAD.Score() handles everything internally. // It scores all base detectors (which self-update) and applies // MWU weight adaptation. No separate Update() needed. if level == LevelNormal { return s.ensemble.Score(vector) } // LevelHigh / LevelCritical: // 1. Advance all base detectors via SEAD.Update() so inactive detectors // (MAD, RRCF variants for LevelHigh; RRCF, COPOD for LevelCritical) // maintain current state. SEAD weight adaptation is NOT performed here // because we are bypassing SEAD.Score(). if err := s.ensemble.Update(vector); err != nil { // Non-fatal: log and continue. A single missed update is acceptable; // the detector will resync on the next tick. log.Printf("scaling: ensemble update error at level %s: %v", levelName[level], err) } // 2. Score via the active sub-detector. // COPOD.Score() additionally self-updates (score-then-insert), which is // correct and complementary to the Update() call above (see type doc). // MAD.Update() internally calls Score(), so it is already current after // the SEAD.Update() call; MAD.Score() here is pure scoring only. switch level { case LevelHigh: if s.copod == nil { log.Printf("scaling: COPOD unavailable at LevelHigh, falling back to ensemble") return s.ensemble.Score(vector) } res, err := s.copod.Score(vector) if err != nil { return res, err } res.Method = "COPOD (High Load)" return res, nil case LevelCritical: if s.mad == nil { log.Printf("scaling: MAD unavailable at LevelCritical, falling back to ensemble") return s.ensemble.Score(vector) } res, err := s.mad.Score(vector) if err != nil { return res, err } res.Method = "MAD (Critical Load)" return res, nil default: return s.ensemble.Score(vector) } } // Switch atomically changes the active detection level. // It is a no-op if the requested level equals the current level. // Safe for concurrent use. func (s *SwitchableDetector) Switch(level ScalingLevel) { s.mu.Lock() defer s.mu.Unlock() if s.activeLevel == level { return } log.Printf("[SCALING] %s → %s", levelName[s.activeLevel], levelName[level]) s.activeLevel = level } // ── ScalingController ──────────────────────────────────────────────────────── // ScalingController monitors CPU load and drives a SwitchableDetector through // its scaling levels (Normal → High → Critical and back). // // Level transitions follow a two-phase commit pattern: // // 1. A CPU measurement moves the desired level to a "pending" state. // 2. Only after the pending level has been stable for the configured // duration is Switch() called on the detector. // // This prevents rapid oscillation under bursty workloads. // // Hysteresis rules (in the dead-band between downThres and highThres): // // Critical → High (one step down, not straight to Normal) // High → High (stays until CPU drops below downThres) // Normal → Normal // // ScalingController is not safe for concurrent use. ObserveCPU must be // called from a single goroutine (the DetectionLayer's processing loop). type ScalingController struct { detector *SwitchableDetector // Thresholds (CPU percent, 0–100) highThres float64 critThres float64 downThres float64 // Required stable duration before a level transition is committed. highDur time.Duration critDur time.Duration downDur time.Duration // currentLevel is the level that has been committed to the detector. currentLevel ScalingLevel // pendingLevel is the desired level based on recent CPU measurements. // It must remain stable for the corresponding duration before becoming current. pendingLevel ScalingLevel // pendingStart is the time at which pendingLevel last changed. // The pending level is committed when time.Since(pendingStart) >= required duration. pendingStart time.Time } // NewScalingController constructs a ScalingController. // Duration arguments are in seconds (float64 to match YAML config values). func NewScalingController( detector *SwitchableDetector, highThres, critThres, downThres float64, highDurSec, critDurSec, downDurSec float64, ) *ScalingController { return &ScalingController{ detector: detector, highThres: highThres, critThres: critThres, downThres: downThres, highDur: time.Duration(highDurSec * float64(time.Second)), critDur: time.Duration(critDurSec * float64(time.Second)), downDur: time.Duration(downDurSec * float64(time.Second)), currentLevel: LevelNormal, pendingLevel: LevelNormal, pendingStart: time.Now(), // explicit init avoids zero-time edge case } } // ObserveCPU processes a single CPU measurement and, if warranted, triggers // a level switch on the underlying SwitchableDetector. // // Must be called from a single goroutine only (not safe for concurrent use). func (c *ScalingController) ObserveCPU(cpuPercent float64) { now := time.Now() desired := c.desiredLevel(cpuPercent) // Phase 1: desired level changed → restart the stability timer. if desired != c.pendingLevel { c.pendingLevel = desired c.pendingStart = now return } // Phase 2: desired level has been stable – check if duration is met. if now.Sub(c.pendingStart) < c.durationFor(desired) { return } if desired != c.currentLevel { c.currentLevel = desired c.detector.Switch(desired) } c.pendingStart = now } // desiredLevel computes the target ScalingLevel for a given CPU measurement, // applying hysteresis in the dead-band between downThres and highThres. func (c *ScalingController) desiredLevel(cpuPercent float64) ScalingLevel { switch { case cpuPercent > c.critThres: return LevelCritical case cpuPercent > c.highThres: return LevelHigh case cpuPercent < c.downThres: return LevelNormal default: // Dead-band: degrade at most one step to avoid jumping straight // from Critical to Normal on a brief CPU dip. switch c.currentLevel { case LevelCritical: return LevelHigh case LevelHigh: return LevelHigh default: return LevelNormal } } } // durationFor returns the required stable duration for a given target level. func (c *ScalingController) durationFor(level ScalingLevel) time.Duration { switch level { case LevelCritical: return c.critDur case LevelHigh: return c.highDur default: return c.downDur } }