guenther/internal/detect/scaling.go

299 lines
10 KiB
Go
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

package detect
import (
"log"
"sync"
"time"
"codeberg.org/pata1704/guenther/pkg/types"
)
// ScalingLevel represents the current detector complexity level.
type ScalingLevel int
const (
LevelNormal ScalingLevel = iota // SEAD Ensemble (full accuracy)
LevelHigh // COPOD (reduced complexity)
LevelCritical // MAD (minimal overhead)
)
// levelName maps ScalingLevel to a human-readable string for logging.
var levelName = map[ScalingLevel]string{
LevelNormal: "SEAD Ensemble (Normal)",
LevelHigh: "COPOD (High Load)",
LevelCritical: "MAD (Critical Load)",
}
// ── SwitchableDetector ───────────────────────────────────────────────────────
// SwitchableDetector wraps a SEADDetector and allows runtime switching to
// lighter-weight sub-detectors (COPOD, MAD) under high CPU load.
//
// State consistency guarantee: all base detectors are kept up-to-date
// regardless of which one is currently active. This ensures a clean
// transition back to SEAD without stale internal state.
//
// Update-deduplication contract:
//
// SEAD.Score() calls d.Score() on every base detector, which self-updates.
// → no separate Update() call needed; doing so would double-count.
// SEAD.Update() calls d.Update() on every base detector directly.
// → used here when we need to advance inactive detectors
// without scoring through SEAD.
//
// For LevelHigh / LevelCritical we call:
//
// s.ensemble.Update(vector) → advances MAD, RRCF variants via d.Update()
// COPOD.Update() = COPOD.update() (buffer append only)
// active.Score(vector) → scores + self-updates the active detector
// (COPOD.Score calls update internally again)
//
// This means COPOD receives one Update() + one self-update from Score() per tick.
// That is intentional: Update() appends to the sliding window buffer; Score()
// computes the copula and then appends the scored point (score-then-insert).
// The two operations are not idempotent and must both run for correct behaviour.
// RRCF and MAD are updated via SEAD.Update() only; their Score() methods are
// not called when inactive so they do not double-count.
type SwitchableDetector struct {
mu sync.RWMutex
ensemble *SEADDetector
copod AnomalyDetector // may be nil if COPOD is not configured
mad AnomalyDetector // may be nil if MAD is not configured
activeLevel ScalingLevel
}
// NewSwitchableDetector creates a SwitchableDetector backed by the given
// SEADDetector. COPOD and MAD sub-detectors are extracted from the ensemble
// for direct access during high-load switching.
//
// If a sub-detector is not present in the ensemble, the corresponding field
// is nil and Score() falls back to the ensemble for that level.
func NewSwitchableDetector(ensemble *SEADDetector) *SwitchableDetector {
return &SwitchableDetector{
ensemble: ensemble,
copod: ensemble.GetDetector("COPOD"),
mad: ensemble.GetDetector("MAD"),
activeLevel: LevelNormal,
}
}
// Fit trains all underlying detectors on the given baseline vectors.
func (s *SwitchableDetector) Fit(vectors []types.FeatureVector) error {
return s.ensemble.Fit(vectors)
}
// Update advances the internal state of all base detectors without scoring.
// Safe for concurrent use.
func (s *SwitchableDetector) Update(vector types.FeatureVector) error {
return s.ensemble.Update(vector)
}
// Score returns an AnomalyResult from the currently active detector.
//
// All inactive detectors are kept current via SEAD.Update() so that
// switching back to a heavier detector does not produce stale scores.
// Safe for concurrent use.
func (s *SwitchableDetector) Score(vector types.FeatureVector) (types.AnomalyResult, error) {
s.mu.RLock()
level := s.activeLevel
s.mu.RUnlock()
// LevelNormal: SEAD.Score() handles everything internally.
// It scores all base detectors (which self-update) and applies
// MWU weight adaptation. No separate Update() needed.
if level == LevelNormal {
return s.ensemble.Score(vector)
}
// LevelHigh / LevelCritical:
// 1. Advance all base detectors via SEAD.Update() so inactive detectors
// (MAD, RRCF variants for LevelHigh; RRCF, COPOD for LevelCritical)
// maintain current state. SEAD weight adaptation is NOT performed here
// because we are bypassing SEAD.Score().
if err := s.ensemble.Update(vector); err != nil {
// Non-fatal: log and continue. A single missed update is acceptable;
// the detector will resync on the next tick.
log.Printf("scaling: ensemble update error at level %s: %v", levelName[level], err)
}
// 2. Score via the active sub-detector.
// COPOD.Score() additionally self-updates (score-then-insert), which is
// correct and complementary to the Update() call above (see type doc).
// MAD.Update() internally calls Score(), so it is already current after
// the SEAD.Update() call; MAD.Score() here is pure scoring only.
switch level {
case LevelHigh:
if s.copod == nil {
log.Printf("scaling: COPOD unavailable at LevelHigh, falling back to ensemble")
return s.ensemble.Score(vector)
}
res, err := s.copod.Score(vector)
if err != nil {
return res, err
}
res.Method = "COPOD (High Load)"
return res, nil
case LevelCritical:
if s.mad == nil {
log.Printf("scaling: MAD unavailable at LevelCritical, falling back to ensemble")
return s.ensemble.Score(vector)
}
res, err := s.mad.Score(vector)
if err != nil {
return res, err
}
res.Method = "MAD (Critical Load)"
return res, nil
default:
return s.ensemble.Score(vector)
}
}
// Switch atomically changes the active detection level.
// It is a no-op if the requested level equals the current level.
// Safe for concurrent use.
func (s *SwitchableDetector) Switch(level ScalingLevel) {
s.mu.Lock()
defer s.mu.Unlock()
if s.activeLevel == level {
return
}
log.Printf("[SCALING] %s → %s", levelName[s.activeLevel], levelName[level])
s.activeLevel = level
}
// ── ScalingController ────────────────────────────────────────────────────────
// ScalingController monitors CPU load and drives a SwitchableDetector through
// its scaling levels (Normal → High → Critical and back).
//
// Level transitions follow a two-phase commit pattern:
//
// 1. A CPU measurement moves the desired level to a "pending" state.
// 2. Only after the pending level has been stable for the configured
// duration is Switch() called on the detector.
//
// This prevents rapid oscillation under bursty workloads.
//
// Hysteresis rules (in the dead-band between downThres and highThres):
//
// Critical → High (one step down, not straight to Normal)
// High → High (stays until CPU drops below downThres)
// Normal → Normal
//
// ScalingController is not safe for concurrent use. ObserveCPU must be
// called from a single goroutine (the DetectionLayer's processing loop).
type ScalingController struct {
detector *SwitchableDetector
// Thresholds (CPU percent, 0100)
highThres float64
critThres float64
downThres float64
// Required stable duration before a level transition is committed.
highDur time.Duration
critDur time.Duration
downDur time.Duration
// currentLevel is the level that has been committed to the detector.
currentLevel ScalingLevel
// pendingLevel is the desired level based on recent CPU measurements.
// It must remain stable for the corresponding duration before becoming current.
pendingLevel ScalingLevel
// pendingStart is the time at which pendingLevel last changed.
// The pending level is committed when time.Since(pendingStart) >= required duration.
pendingStart time.Time
}
// NewScalingController constructs a ScalingController.
// Duration arguments are in seconds (float64 to match YAML config values).
func NewScalingController(
detector *SwitchableDetector,
highThres, critThres, downThres float64,
highDurSec, critDurSec, downDurSec float64,
) *ScalingController {
return &ScalingController{
detector: detector,
highThres: highThres,
critThres: critThres,
downThres: downThres,
highDur: time.Duration(highDurSec * float64(time.Second)),
critDur: time.Duration(critDurSec * float64(time.Second)),
downDur: time.Duration(downDurSec * float64(time.Second)),
currentLevel: LevelNormal,
pendingLevel: LevelNormal,
pendingStart: time.Now(), // explicit init avoids zero-time edge case
}
}
// ObserveCPU processes a single CPU measurement and, if warranted, triggers
// a level switch on the underlying SwitchableDetector.
//
// Must be called from a single goroutine only (not safe for concurrent use).
func (c *ScalingController) ObserveCPU(cpuPercent float64) {
now := time.Now()
desired := c.desiredLevel(cpuPercent)
// Phase 1: desired level changed → restart the stability timer.
if desired != c.pendingLevel {
c.pendingLevel = desired
c.pendingStart = now
return
}
// Phase 2: desired level has been stable check if duration is met.
if now.Sub(c.pendingStart) < c.durationFor(desired) {
return
}
if desired != c.currentLevel {
c.currentLevel = desired
c.detector.Switch(desired)
}
c.pendingStart = now
}
// desiredLevel computes the target ScalingLevel for a given CPU measurement,
// applying hysteresis in the dead-band between downThres and highThres.
func (c *ScalingController) desiredLevel(cpuPercent float64) ScalingLevel {
switch {
case cpuPercent > c.critThres:
return LevelCritical
case cpuPercent > c.highThres:
return LevelHigh
case cpuPercent < c.downThres:
return LevelNormal
default:
// Dead-band: degrade at most one step to avoid jumping straight
// from Critical to Normal on a brief CPU dip.
switch c.currentLevel {
case LevelCritical:
return LevelHigh
case LevelHigh:
return LevelHigh
default:
return LevelNormal
}
}
}
// durationFor returns the required stable duration for a given target level.
func (c *ScalingController) durationFor(level ScalingLevel) time.Duration {
switch level {
case LevelCritical:
return c.critDur
case LevelHigh:
return c.highDur
default:
return c.downDur
}
}