// Package detect provides anomaly detection algorithms and ensemble logic. package detect // sead.go – SEAD: Unsupervised Ensemble of Streaming Anomaly Detectors // // Implementation of Algorithm 1 from: // Shah et al. "SEAD: Unsupervised Ensemble of Streaming Anomaly Detectors" // ICML 2025, Amazon Science. // // Core algorithm (Multiplicative Weights Update / FTRL with KL-divergence): // // 1. For each incoming feature vector x_t: // a. Score every base detector: s̃_i(t) = A_i(x_t) // b. Normalise to [0,1] via streaming quantile: s_i(t) = Q(s̃_i(t); history_i) // c. Compute softmax weights: p_i(t) = exp(w_i) / Σ exp(w_j) // d. Output combined score: S_t = Σ p_i(t) · s_i(t) // e. Update weights: w_i(t+1) = w_i(t) − η · ∂L_t/∂w_i // where L_t = S_t + λ · KL(p || π) // 2. Update each base detector: A_i(t+1) ← Update(A_i(t), x_t) // // Streaming quantiles are approximated via a fixed-capacity sorted circular // buffer (lightweight t-digest substitute). For N=4 detectors at 1 Hz this // is negligible memory and CPU overhead. // // SEAD runs parallel to the existing AVG/MAX/MEDIAN ensemble; it is selected // by setting detector.ensemble.method = "sead" in the config. import ( "fmt" "math" "sort" "strings" "sync" "codeberg.org/pata1704/guenther/pkg/types" ) // ─── FIFO Ring Buffer ───────────────────────────────────────────────────────── // ringBuffer is a fixed-capacity circular buffer with true FIFO eviction. // // Memory: O(cap · 8 bytes). For cap=500 this is 4 KB per detector type ringBuffer struct { data []float64 head int // index of the next write position size int // current number of elements cap int } func newRingBuffer(capacity int) *ringBuffer { if capacity < 10 { capacity = 10 } return &ringBuffer{ data: make([]float64, capacity), cap: capacity, } } // push inserts v, overwriting the oldest entry when the buffer is full. // Returns the empirical quantile rank of v within the current window ∈ [0,1]. func (r *ringBuffer) push(v float64) float64 { r.data[r.head] = v r.head = (r.head + 1) % r.cap if r.size < r.cap { r.size++ } n := r.size if n <= 1 { return 0.5 } sorted := make([]float64, n) for i := range n { sorted[i] = r.data[(r.head-n+i+r.cap)%r.cap] } sort.Float64s(sorted) rank := sort.SearchFloat64s(sorted, v) return float64(rank) / float64(n-1) } // quantileVal returns the value at quantile p ∈ [0,1] without modifying the buffer. func (r *ringBuffer) quantileVal(p float64) float64 { n := r.size if n == 0 { return 0 } sorted := make([]float64, n) for i := range n { sorted[i] = r.data[(r.head-n+i+r.cap)%r.cap] } sort.Float64s(sorted) idx := int(p * float64(n-1)) if idx >= n { idx = n - 1 } return sorted[idx] } // streamQuantile is an alias kept for API compatibility. // New code should use ringBuffer directly. type streamQuantile = ringBuffer func newStreamQuantile(capacity int) *ringBuffer { return newRingBuffer(capacity) } // ─── SEADDetector ───────────────────────────────────────────────────────────── // SEADDetector implements the SEAD algorithm: an unsupervised online ensemble // that adaptively weights N base anomaly detectors using Multiplicative Weights // Update (MWU / FTRL with KL-divergence regulariser). // // Key properties: // - Fully unsupervised: no anomaly labels required. // - O(1) per time step: computational cost does not grow with stream length. // - Adaptive: detector weights shift as data distribution changes. // - Score-scale agnostic: all base scores are quantile-normalised to [0,1] // before aggregation, preventing any single detector from dominating due // to score magnitude differences. // // Configuration: // - eta (η): MWU learning rate. Larger → faster adaptation, more noise. // Recommended range: [0.05, 0.3]. Default: 0.1. // - lambda (λ): KL-divergence regularisation strength. 0 = pure MWU (uniform // prior). Positive values pull weights toward π (uniform). Default: 0.01. // - quantileWindow: number of past scores retained per detector for quantile // normalisation. Default: 300. // - contamination: expected anomaly fraction used to set the decision // threshold as quantile(combinedHistory, 1-contamination). Default: 0.15. // - minDataPoints: minimum scored windows before any anomaly is flagged. type SEADDetector struct { detectors []AnomalyDetector // N base detectors (MAD, RRCF, COPOD, IForest) names []string // human-readable name per detector // MWU state weights []float64 // w_i (log-space, unconstrained) eta float64 // learning rate η lambda float64 // KL regularisation strength λ prior []float64 // π – uniform by default // Streaming quantile per detector quantiles []*streamQuantile // Combined score history for threshold computation // Uses a FIFO ring buffer (capacity: historySize) so every score lives // exactly historySize time steps, regardless of its magnitude. contamination float64 combinedHistory *ringBuffer // FIFO ring buffer, capacity=1000 minDataPoints int mu sync.Mutex } // SEADConfig holds all tunable parameters for the SEAD ensemble. type SEADConfig struct { // Eta is the MWU learning rate η. // Higher values react faster to distribution shifts but are noisier. // Recommended: 0.05–0.20. Default: 0.10. Eta float64 // Lambda is the KL-divergence regularisation strength. // 0 = pure MWU (no penalty for deviation from prior). // Positive values add stability; use 0.01–0.05. Lambda float64 // QuantileWindow is the number of past scores retained per detector. // Larger → more stable quantiles but slower adaptation. // Default: 300. QuantileWindow int // Contamination is the expected anomaly fraction ∈ [0, 0.5). // Sets the decision threshold at quantile(1-contamination) of combined history. // Default: 0.15. Contamination float64 // MinDataPoints is the cold-start guard: anomalies are not flagged until // at least this many windows have been scored. Default: 20. MinDataPoints int } // DefaultSEADConfig returns sensible defaults for the SEAD ensemble. func DefaultSEADConfig() SEADConfig { return SEADConfig{ Eta: 0.10, Lambda: 0.01, QuantileWindow: 300, Contamination: 0.15, MinDataPoints: 20, } } // NewSEADDetector constructs a SEAD ensemble from N base detectors. // // - detectors: slice of base AnomalyDetector implementations. Must be ≥ 1. // - names: human-readable labels for each detector (used in Details field). // - cfg: SEAD tuning parameters (use DefaultSEADConfig() for a safe start). func NewSEADDetector( detectors []AnomalyDetector, names []string, cfg SEADConfig, ) (*SEADDetector, error) { n := len(detectors) if n == 0 { return nil, fmt.Errorf("sead: at least one base detector required") } if len(names) != n { return nil, fmt.Errorf("sead: names length %d must match detectors length %d", len(names), n) } if cfg.Eta <= 0 { cfg.Eta = 0.10 } if cfg.QuantileWindow <= 0 { cfg.QuantileWindow = 300 } if cfg.Contamination <= 0 || cfg.Contamination >= 0.5 { cfg.Contamination = 0.15 } if cfg.MinDataPoints <= 0 { cfg.MinDataPoints = 20 } // Uniform prior π = 1/N for all detectors. prior := make([]float64, n) for i := range prior { prior[i] = 1.0 / float64(n) } // Initialise weights uniformly in log-space: w_i = 0 → softmax = 1/N. weights := make([]float64, n) quantiles := make([]*streamQuantile, n) for i := range quantiles { quantiles[i] = newStreamQuantile(cfg.QuantileWindow) } return &SEADDetector{ detectors: detectors, names: names, weights: weights, eta: cfg.Eta, lambda: cfg.Lambda, prior: prior, quantiles: quantiles, contamination: cfg.Contamination, combinedHistory: newRingBuffer(1000), minDataPoints: cfg.MinDataPoints, }, nil } // Fit seeds all base detectors from labelled-normal vectors. // SEAD itself has no training phase; only the base detectors are fitted. func (s *SEADDetector) Fit(vectors []types.FeatureVector) error { for i, d := range s.detectors { if err := d.Fit(vectors); err != nil { return fmt.Errorf("sead: fit detector %q: %w", s.names[i], err) } } return nil } // Update propagates the feature vector to all base detectors. func (s *SEADDetector) Update(vector types.FeatureVector) error { for i, d := range s.detectors { if err := d.Update(vector); err != nil { return fmt.Errorf("sead: update detector %q: %w", s.names[i], err) } } return nil } // Score implements Algorithm 1 from the SEAD paper. // // Steps: // 1. Score each base detector → raw scores s̃_i. // Each detector also self-updates its internal state (RRCF inserts // the point into the forest; COPOD appends to its copula buffer; // IForest adds to its retraining buffer; MAD buffers for calibration). // 2. Quantile-normalise each s̃_i to ŝ_i ∈ [0,1] via streaming window. // 3. Compute softmax weights p_i = exp(w_i) / Σ exp(w_j). // 4. Combined score S = Σ p_i · ŝ_i. // 5. Update weights: w_i -= η · ∂L/∂w_i // where L = S + λ · KL(p || π). // 6. Threshold S against rolling (1-contamination)-quantile of S history. func (s *SEADDetector) Score(vector types.FeatureVector) (types.AnomalyResult, error) { n := len(s.detectors) // ── Step 1: Score all base detectors ────────────────────────────────────── // Each detector's Score method is responsible for self-updating (RRCF inserts // into its forest; COPOD appends to its copula buffer; etc.). We do NOT call // d.Update separately here to avoid double-counting in detectors that already // self-update inside Score. rawScores := make([]float64, n) anomalyFlags := make([]bool, n) for i, d := range s.detectors { res, err := d.Score(vector) if err != nil { // Degrade gracefully: treat failed detector as neutral (score=0.5). rawScores[i] = 0.5 } else { rawScores[i] = res.Score anomalyFlags[i] = res.IsAnomaly } } s.mu.Lock() defer s.mu.Unlock() // ── Step 2: Quantile-normalise scores to [0,1] ──────────────────────────── normScores := make([]float64, n) for i, raw := range rawScores { normScores[i] = s.quantiles[i].push(raw) } // ── Step 3: Softmax weights ─────────────────────────────────────────────── p := softmax(s.weights) // ── Step 4: Combined score ──────────────────────────────────────────────── combined := 0.0 for i := range p { combined += p[i] * normScores[i] } // ── Step 5: Weight update (MWU gradient step) ───────────────────────────── // Loss L(w) = combined(w) + λ · KL(softmax(w) || π) // ∂L/∂w_i = p_i · (ŝ_i - combined) + λ · (p_i - π_i) // // This is the closed-form gradient for softmax + weighted sum + KL penalty. for i := range s.weights { gradCombined := p[i] * (normScores[i] - combined) gradKL := s.lambda * (p[i] - s.prior[i]) s.weights[i] -= s.eta * (gradCombined + gradKL) } // ── Step 6: Threshold decision ──────────────────────────────────────────── // Use FIFO ring buffer: oldest score is evicted automatically after // 1000 time steps, giving the threshold a finite, sliding memory. s.combinedHistory.push(combined) threshold := s.combinedHistory.quantileVal(1.0 - s.contamination) isAnomaly := s.combinedHistory.size > s.minDataPoints && combined > threshold confidence := 0.0 if threshold > 1e-9 { confidence = math.Min(combined/threshold, 1.0) } return types.AnomalyResult{ Timestamp: vector.Timestamp, Score: combined, IsAnomaly: isAnomaly, Confidence: confidence, Method: "SEAD", Details: s.detailString(p, normScores, anomalyFlags), }, nil } // GetDetector returns a base detector by name. Returns nil if not found. func (s *SEADDetector) GetDetector(name string) AnomalyDetector { s.mu.Lock() defer s.mu.Unlock() for i, n := range s.names { if n == name { return s.detectors[i] } } return nil } // Weights returns a copy of the current softmax-normalised detector weights. // Useful for logging and diagnostics. Thread-safe. func (s *SEADDetector) Weights() []float64 { s.mu.Lock() defer s.mu.Unlock() return softmax(s.weights) } // WeightSummary returns a human-readable string of detector weights. func (s *SEADDetector) WeightSummary() string { w := s.Weights() var sb strings.Builder for i, name := range s.names { if i > 0 { sb.WriteString(" | ") } sb.WriteString(fmt.Sprintf("%s=%.3f", name, w[i])) } return sb.String() } // detailString builds a diagnostic annotation for AnomalyResult.Details. // Caller must hold s.mu. func (s *SEADDetector) detailString(p, normScores []float64, flags []bool) string { var parts []string for i, name := range s.names { flag := "" if flags[i] { flag = "!" } parts = append(parts, fmt.Sprintf("%s%s:w=%.2f,s=%.2f", name, flag, p[i], normScores[i])) } return strings.Join(parts, " ") } // ─── Math helpers ───────────────────────────────────────────────────────────── // softmax returns exp(w_i) / Σ exp(w_j) with numerical stability (max subtraction). func softmax(w []float64) []float64 { maxW := w[0] for _, v := range w[1:] { if v > maxW { maxW = v } } out := make([]float64, len(w)) var sum float64 for i, v := range w { out[i] = math.Exp(v - maxW) sum += out[i] } for i := range out { out[i] /= sum } return out } // ─── Factory helpers ────────────────────────────────────────────────────────── // NewSEADWithAllDetectors constructs a SEAD ensemble from six base detectors: // MAD, RRCF-fast, RRCF-mid, RRCF-slow, COPOD, IsolationForest. // // SEAD's MWU weight-update naturally up-weights the variant that consistently // separates anomalies from normal windows, and adapts when the stream // distribution shifts (e.g. time-of-day effects). // // MAD auto-calibration: the MADDetector buffers the first madCalibSize // NormalizedVectors, derives per-feature median and MAD, and starts scoring // once calibration is complete. Calibration requires no external tooling. // SEAD down-weights MAD automatically during the warmup phase. func NewSEADWithAllDetectors( copodBufferSize int, copodThreshold float64, rrcfVariants RRCFVariantsConfig, madThreshold float64, madCalibSize int, seadCfg SEADConfig, ) (*SEADDetector, error) { if rrcfVariants.Fast.NumTrees == 0 { rrcfVariants.Fast.NumTrees = 50 } if rrcfVariants.Fast.TreeSize == 0 { rrcfVariants.Fast.TreeSize = 32 } if rrcfVariants.Fast.ThresholdPercentile == 0 { rrcfVariants.Fast.ThresholdPercentile = 0.85 } if rrcfVariants.Mid.NumTrees == 0 { rrcfVariants.Mid.NumTrees = 150 } if rrcfVariants.Mid.TreeSize == 0 { rrcfVariants.Mid.TreeSize = 64 } if rrcfVariants.Mid.ThresholdPercentile == 0 { rrcfVariants.Mid.ThresholdPercentile = 0.85 } if rrcfVariants.Slow.NumTrees == 0 { rrcfVariants.Slow.NumTrees = 200 } if rrcfVariants.Slow.TreeSize == 0 { rrcfVariants.Slow.TreeSize = 128 } if rrcfVariants.Slow.ThresholdPercentile == 0 { rrcfVariants.Slow.ThresholdPercentile = 0.85 } // ── Construct base detectors ────────────────────────────────────────────── copod, err := NewCOPODDetector(copodBufferSize, copodThreshold) if err != nil { return nil, fmt.Errorf("sead: copod: %w", err) } rrcfFast := NewRRCFDetector( rrcfVariants.Fast.NumTrees, rrcfVariants.Fast.TreeSize, 0, rrcfVariants.Fast.ThresholdPercentile, ) rrcfMid := NewRRCFDetector( rrcfVariants.Mid.NumTrees, rrcfVariants.Mid.TreeSize, 0, rrcfVariants.Mid.ThresholdPercentile, ) rrcfSlow := NewRRCFDetector( rrcfVariants.Slow.NumTrees, rrcfVariants.Slow.TreeSize, 0, rrcfVariants.Slow.ThresholdPercentile, ) if madCalibSize <= 0 { madCalibSize = 100 } mad := NewMADDetectorAutoCalibrate(madThreshold, madCalibSize) return NewSEADDetector( []AnomalyDetector{mad, rrcfFast, rrcfMid, rrcfSlow, copod}, []string{"MAD", "RRCF-fast", "RRCF-mid", "RRCF-slow", "COPOD"}, seadCfg, ) }