// Package detect provides anomaly detection algorithms and ensemble logic. package detect import ( "fmt" "math" "sort" "strings" "sync" "codeberg.org/pata1704/guenther/pkg/types" ) // EnsembleMethod selects the score-aggregation strategy used by EnsembleDetector. type EnsembleMethod string const ( // EnsembleAVG combines normalised sub-scores by arithmetic mean. EnsembleAVG EnsembleMethod = "avg" // EnsembleMAX takes the maximum of the normalised sub-scores (aggressive). EnsembleMAX EnsembleMethod = "max" // EnsembleMEDIAN uses the median of normalised sub-scores (robust to outliers). EnsembleMEDIAN EnsembleMethod = "median" // EnsembleSEAD delegates to an embedded SEADDetector (adaptive MWU weights). // This method is selected by setting detector.ensemble.method = "sead" in // the config. The four base detectors (MAD, RRCF, COPOD, IForest) are // instantiated with the same parameters as the non-SEAD ensemble paths and // the SEAD wrapper handles the online weight updates automatically. EnsembleSEAD EnsembleMethod = "sead" ) // RRCFVariantConfig holds parameters for a single named RRCF instance in the // SEAD multi-horizon ensemble. type RRCFVariantConfig struct { // NumTrees controls score stability: more trees → smoother / more conservative. NumTrees int // TreeSize is the sliding-window capacity per tree. TreeSize int // ThresholdPercentile is the per-model decision threshold for standalone use. ThresholdPercentile float64 } // RRCFVariantsConfig groups the three RRCF horizon variants used by the SEAD ensemble. // - Fast: short memory, reactive to transient spikes // - Mid: medium memory, balanced sensitivity // - Slow: long memory, detects sustained / slow-drift events type RRCFVariantsConfig struct { Fast RRCFVariantConfig Mid RRCFVariantConfig Slow RRCFVariantConfig } // EnsembleDetector implements the AnomalyDetector interface by combining // COPOD and RRCF scores using min-max normalisation. // // Scoring strategy (AVG / MAX / MEDIAN methods): // 1. Each model produces a raw score on its own scale. // 2. Both scores are normalised to [0, 1] using a rolling min/max window. // 3. The combined score is the result of the selected aggregation function. // 4. A window is flagged anomalous when combinedScore > threshold where // threshold = quantile(combinedHistory, 1-contamination). // // SEAD method: // // When method == EnsembleSEAD the detector delegates entirely to an embedded // SEADDetector which wraps all four base detectors and uses Multiplicative // Weights Update (MWU/FTRL) to adapt weights online. The COPOD and RRCF // sub-detectors passed to NewEnsembleDetector are still created but are only // used when method != EnsembleSEAD. type EnsembleDetector struct { method EnsembleMethod // sub-detectors for AVG/MAX/MEDIAN methods copod AnomalyDetector rrcf AnomalyDetector // SEAD method: fully adaptive ensemble (replaces copod+rrcf when active) sead *SEADDetector contamination float64 mu sync.Mutex copodHistory []float64 rrcfHistory []float64 combinedHistory []float64 historySize int } // NewEnsembleDetector initialises the multi-model ensemble. // // - method: "avg" | "max" | "median" | "sead" // - copodBufferSize: sliding-window capacity for COPOD (≥ 100 recommended). // - copodThreshold: per-model threshold passed to COPODDetector. // - rrcfVariants: three-horizon RRCF config (fast/mid/slow). Used by SEAD; // the Mid variant is also used for the classic AVG/MAX/MEDIAN path. // - contamination: expected fraction of anomalies ∈ [0, 0.5). // - seadCfg: SEAD parameters (only used when method == "sead"). // Pass detect.DefaultSEADConfig() when method != "sead". func NewEnsembleDetector( method EnsembleMethod, copodBufferSize int, copodThreshold float64, rrcfVariants RRCFVariantsConfig, contamination float64, seadCfg SEADConfig, ) (*EnsembleDetector, error) { e := &EnsembleDetector{ method: method, contamination: contamination, historySize: 1000, } if method == EnsembleSEAD { // Delegate to SEADDetector with all six base detectors (3 RRCF horizons). // MAD is bootstrapped with identity priors (median=0, MAD=1); it will // calibrate itself during the pipeline warm-up phase. sead, err := NewSEADWithAllDetectors( copodBufferSize, copodThreshold, rrcfVariants, 3.5, 0, // madThreshold=3.5, madCalibSize=0→default 100 vectors seadCfg, ) if err != nil { return nil, fmt.Errorf("ensemble: sead: %w", err) } e.sead = sead } else { // Classic AVG/MAX/MEDIAN path: only COPOD + RRCF (Mid variant as default). copodDet, err := NewCOPODDetector(copodBufferSize, copodThreshold) if err != nil { return nil, fmt.Errorf("ensemble: %w", err) } e.copod = copodDet // Use Mid variant defaults for the classic ensemble path. midTrees := rrcfVariants.Mid.NumTrees if midTrees == 0 { midTrees = 150 } midSize := rrcfVariants.Mid.TreeSize if midSize == 0 { midSize = 64 } midPct := rrcfVariants.Mid.ThresholdPercentile if midPct == 0 { midPct = 0.85 } e.rrcf = NewRRCFDetector(midTrees, midSize, 0, midPct) } return e, nil } // SEAD returns the underlying SEADDetector if the ensemble is in SEAD mode. func (e *EnsembleDetector) SEAD() *SEADDetector { e.mu.Lock() defer e.mu.Unlock() return e.sead } // Fit seeds the underlying models from a slice of feature vectors. func (e *EnsembleDetector) Fit(vectors []types.FeatureVector) error { if e.method == EnsembleSEAD { return e.sead.Fit(vectors) } if err := e.copod.Fit(vectors); err != nil { return fmt.Errorf("ensemble: fit copod: %w", err) } if err := e.rrcf.Fit(vectors); err != nil { return fmt.Errorf("ensemble: fit rrcf: %w", err) } return nil } // Update propagates the vector to the underlying models. func (e *EnsembleDetector) Update(vector types.FeatureVector) error { if e.method == EnsembleSEAD { return e.sead.Update(vector) } if err := e.copod.Update(vector); err != nil { return fmt.Errorf("ensemble: update copod: %w", err) } if err := e.rrcf.Update(vector); err != nil { return fmt.Errorf("ensemble: update rrcf: %w", err) } return nil } // Score evaluates the feature vector. // // For SEAD method: delegates entirely to the embedded SEADDetector. // For AVG/MAX/MEDIAN: min-max normalises COPOD and RRCF scores and aggregates. func (e *EnsembleDetector) Score(vector types.FeatureVector) (types.AnomalyResult, error) { if e.method == EnsembleSEAD { res, err := e.sead.Score(vector) if err != nil { return types.AnomalyResult{}, fmt.Errorf("ensemble: sead score: %w", err) } return res, nil } resCOPOD, err := e.copod.Score(vector) if err != nil { return types.AnomalyResult{}, fmt.Errorf("ensemble: score copod: %w", err) } resRRCF, err := e.rrcf.Score(vector) if err != nil { return types.AnomalyResult{}, fmt.Errorf("ensemble: score rrcf: %w", err) } e.mu.Lock() defer e.mu.Unlock() e.appendHistory(&e.copodHistory, resCOPOD.Score) e.appendHistory(&e.rrcfHistory, resRRCF.Score) normCOPOD := minMaxNorm(resCOPOD.Score, e.copodHistory) normRRCF := minMaxNorm(resRRCF.Score, e.rrcfHistory) var combined float64 switch e.method { case EnsembleMAX: combined = math.Max(normCOPOD, normRRCF) case EnsembleMEDIAN: // Median of two values = average; kept for future N>2 extension. vals := []float64{normCOPOD, normRRCF} sort.Float64s(vals) combined = vals[len(vals)/2] default: // EnsembleAVG combined = (normCOPOD + normRRCF) / 2.0 } e.appendHistory(&e.combinedHistory, combined) const minDataPoints = 10 threshold := quantile(e.combinedHistory, 1.0-e.contamination) isAnomaly := len(e.combinedHistory) > minDataPoints && combined > threshold return types.AnomalyResult{ Timestamp: vector.Timestamp, Score: combined, IsAnomaly: isAnomaly, Confidence: math.Min(combined/math.Max(threshold, 1e-9), 1.0), Method: e.methodString(string(e.method), resCOPOD.IsAnomaly, resRRCF.IsAnomaly), }, nil } // WeightSummary returns the current SEAD detector weights as a human-readable // string. Returns "" when the ensemble is not using SEAD. func (e *EnsembleDetector) WeightSummary() string { if e.method != EnsembleSEAD || e.sead == nil { return "" } return e.sead.WeightSummary() } // appendHistory appends v to *h, evicting the oldest entry when full. // Caller must hold e.mu. func (e *EnsembleDetector) appendHistory(h *[]float64, v float64) { *h = append(*h, v) if len(*h) > e.historySize { *h = (*h)[1:] } } // methodString builds a concise label for AnomalyResult.Method. func (e *EnsembleDetector) methodString(method string, copodAnomaly, rrcfAnomaly bool) string { var active []string if copodAnomaly { active = append(active, "COPOD") } if rrcfAnomaly { active = append(active, "RRCF") } if len(active) > 0 { return fmt.Sprintf("Ensemble-%s(%s)", strings.ToUpper(method), strings.Join(active, "+")) } return fmt.Sprintf("Ensemble-%s(none)", strings.ToUpper(method)) } // ── score helpers ───────────────────────────────────────────────────────────── // minMaxNorm normalises v into [0, 1] using the observed min/max of history. func minMaxNorm(v float64, history []float64) float64 { if len(history) == 0 { return 0 } minV, maxV := history[0], history[0] for _, h := range history[1:] { if h < minV { minV = h } if h > maxV { maxV = h } } spread := maxV - minV if spread < 1e-12 { return 0.5 } norm := (v - minV) / spread if norm < 0 { return 0 } if norm > 1 { return 1 } return norm } // quantile returns the p-th quantile of data without modifying the slice. func quantile(data []float64, p float64) float64 { n := len(data) if n == 0 { return 0 } sorted := make([]float64, n) copy(sorted, data) sort.Float64s(sorted) idx := int(float64(n) * p) if idx >= n { idx = n - 1 } return sorted[idx] }