guenther/internal/detect/ensemble.go

325 lines
9.9 KiB
Go

// Package detect provides anomaly detection algorithms and ensemble logic.
package detect
import (
"fmt"
"math"
"sort"
"strings"
"sync"
"codeberg.org/pata1704/guenther/pkg/types"
)
// EnsembleMethod selects the score-aggregation strategy used by EnsembleDetector.
type EnsembleMethod string
const (
// EnsembleAVG combines normalised sub-scores by arithmetic mean.
EnsembleAVG EnsembleMethod = "avg"
// EnsembleMAX takes the maximum of the normalised sub-scores (aggressive).
EnsembleMAX EnsembleMethod = "max"
// EnsembleMEDIAN uses the median of normalised sub-scores (robust to outliers).
EnsembleMEDIAN EnsembleMethod = "median"
// EnsembleSEAD delegates to an embedded SEADDetector (adaptive MWU weights).
// This method is selected by setting detector.ensemble.method = "sead" in
// the config. The four base detectors (MAD, RRCF, COPOD, IForest) are
// instantiated with the same parameters as the non-SEAD ensemble paths and
// the SEAD wrapper handles the online weight updates automatically.
EnsembleSEAD EnsembleMethod = "sead"
)
// RRCFVariantConfig holds parameters for a single named RRCF instance in the
// SEAD multi-horizon ensemble.
type RRCFVariantConfig struct {
// NumTrees controls score stability: more trees → smoother / more conservative.
NumTrees int
// TreeSize is the sliding-window capacity per tree.
TreeSize int
// ThresholdPercentile is the per-model decision threshold for standalone use.
ThresholdPercentile float64
}
// RRCFVariantsConfig groups the three RRCF horizon variants used by the SEAD ensemble.
// - Fast: short memory, reactive to transient spikes
// - Mid: medium memory, balanced sensitivity
// - Slow: long memory, detects sustained / slow-drift events
type RRCFVariantsConfig struct {
Fast RRCFVariantConfig
Mid RRCFVariantConfig
Slow RRCFVariantConfig
}
// EnsembleDetector implements the AnomalyDetector interface by combining
// COPOD and RRCF scores using min-max normalisation.
//
// Scoring strategy (AVG / MAX / MEDIAN methods):
// 1. Each model produces a raw score on its own scale.
// 2. Both scores are normalised to [0, 1] using a rolling min/max window.
// 3. The combined score is the result of the selected aggregation function.
// 4. A window is flagged anomalous when combinedScore > threshold where
// threshold = quantile(combinedHistory, 1-contamination).
//
// SEAD method:
//
// When method == EnsembleSEAD the detector delegates entirely to an embedded
// SEADDetector which wraps all four base detectors and uses Multiplicative
// Weights Update (MWU/FTRL) to adapt weights online. The COPOD and RRCF
// sub-detectors passed to NewEnsembleDetector are still created but are only
// used when method != EnsembleSEAD.
type EnsembleDetector struct {
method EnsembleMethod
// sub-detectors for AVG/MAX/MEDIAN methods
copod AnomalyDetector
rrcf AnomalyDetector
// SEAD method: fully adaptive ensemble (replaces copod+rrcf when active)
sead *SEADDetector
contamination float64
mu sync.Mutex
copodHistory []float64
rrcfHistory []float64
combinedHistory []float64
historySize int
}
// NewEnsembleDetector initialises the multi-model ensemble.
//
// - method: "avg" | "max" | "median" | "sead"
// - copodBufferSize: sliding-window capacity for COPOD (≥ 100 recommended).
// - copodThreshold: per-model threshold passed to COPODDetector.
// - rrcfVariants: three-horizon RRCF config (fast/mid/slow). Used by SEAD;
// the Mid variant is also used for the classic AVG/MAX/MEDIAN path.
// - contamination: expected fraction of anomalies ∈ [0, 0.5).
// - seadCfg: SEAD parameters (only used when method == "sead").
// Pass detect.DefaultSEADConfig() when method != "sead".
func NewEnsembleDetector(
method EnsembleMethod,
copodBufferSize int, copodThreshold float64,
rrcfVariants RRCFVariantsConfig,
contamination float64,
seadCfg SEADConfig,
) (*EnsembleDetector, error) {
e := &EnsembleDetector{
method: method,
contamination: contamination,
historySize: 1000,
}
if method == EnsembleSEAD {
// Delegate to SEADDetector with all six base detectors (3 RRCF horizons).
// MAD is bootstrapped with identity priors (median=0, MAD=1); it will
// calibrate itself during the pipeline warm-up phase.
sead, err := NewSEADWithAllDetectors(
copodBufferSize, copodThreshold,
rrcfVariants,
3.5, 0, // madThreshold=3.5, madCalibSize=0→default 100 vectors
seadCfg,
)
if err != nil {
return nil, fmt.Errorf("ensemble: sead: %w", err)
}
e.sead = sead
} else {
// Classic AVG/MAX/MEDIAN path: only COPOD + RRCF (Mid variant as default).
copodDet, err := NewCOPODDetector(copodBufferSize, copodThreshold)
if err != nil {
return nil, fmt.Errorf("ensemble: %w", err)
}
e.copod = copodDet
// Use Mid variant defaults for the classic ensemble path.
midTrees := rrcfVariants.Mid.NumTrees
if midTrees == 0 {
midTrees = 150
}
midSize := rrcfVariants.Mid.TreeSize
if midSize == 0 {
midSize = 64
}
midPct := rrcfVariants.Mid.ThresholdPercentile
if midPct == 0 {
midPct = 0.85
}
e.rrcf = NewRRCFDetector(midTrees, midSize, 0, midPct)
}
return e, nil
}
// SEAD returns the underlying SEADDetector if the ensemble is in SEAD mode.
func (e *EnsembleDetector) SEAD() *SEADDetector {
e.mu.Lock()
defer e.mu.Unlock()
return e.sead
}
// Fit seeds the underlying models from a slice of feature vectors.
func (e *EnsembleDetector) Fit(vectors []types.FeatureVector) error {
if e.method == EnsembleSEAD {
return e.sead.Fit(vectors)
}
if err := e.copod.Fit(vectors); err != nil {
return fmt.Errorf("ensemble: fit copod: %w", err)
}
if err := e.rrcf.Fit(vectors); err != nil {
return fmt.Errorf("ensemble: fit rrcf: %w", err)
}
return nil
}
// Update propagates the vector to the underlying models.
func (e *EnsembleDetector) Update(vector types.FeatureVector) error {
if e.method == EnsembleSEAD {
return e.sead.Update(vector)
}
if err := e.copod.Update(vector); err != nil {
return fmt.Errorf("ensemble: update copod: %w", err)
}
if err := e.rrcf.Update(vector); err != nil {
return fmt.Errorf("ensemble: update rrcf: %w", err)
}
return nil
}
// Score evaluates the feature vector.
//
// For SEAD method: delegates entirely to the embedded SEADDetector.
// For AVG/MAX/MEDIAN: min-max normalises COPOD and RRCF scores and aggregates.
func (e *EnsembleDetector) Score(vector types.FeatureVector) (types.AnomalyResult, error) {
if e.method == EnsembleSEAD {
res, err := e.sead.Score(vector)
if err != nil {
return types.AnomalyResult{}, fmt.Errorf("ensemble: sead score: %w", err)
}
return res, nil
}
resCOPOD, err := e.copod.Score(vector)
if err != nil {
return types.AnomalyResult{}, fmt.Errorf("ensemble: score copod: %w", err)
}
resRRCF, err := e.rrcf.Score(vector)
if err != nil {
return types.AnomalyResult{}, fmt.Errorf("ensemble: score rrcf: %w", err)
}
e.mu.Lock()
defer e.mu.Unlock()
e.appendHistory(&e.copodHistory, resCOPOD.Score)
e.appendHistory(&e.rrcfHistory, resRRCF.Score)
normCOPOD := minMaxNorm(resCOPOD.Score, e.copodHistory)
normRRCF := minMaxNorm(resRRCF.Score, e.rrcfHistory)
var combined float64
switch e.method {
case EnsembleMAX:
combined = math.Max(normCOPOD, normRRCF)
case EnsembleMEDIAN:
// Median of two values = average; kept for future N>2 extension.
vals := []float64{normCOPOD, normRRCF}
sort.Float64s(vals)
combined = vals[len(vals)/2]
default: // EnsembleAVG
combined = (normCOPOD + normRRCF) / 2.0
}
e.appendHistory(&e.combinedHistory, combined)
const minDataPoints = 10
threshold := quantile(e.combinedHistory, 1.0-e.contamination)
isAnomaly := len(e.combinedHistory) > minDataPoints && combined > threshold
return types.AnomalyResult{
Timestamp: vector.Timestamp,
Score: combined,
IsAnomaly: isAnomaly,
Confidence: math.Min(combined/math.Max(threshold, 1e-9), 1.0),
Method: e.methodString(string(e.method), resCOPOD.IsAnomaly, resRRCF.IsAnomaly),
}, nil
}
// WeightSummary returns the current SEAD detector weights as a human-readable
// string. Returns "" when the ensemble is not using SEAD.
func (e *EnsembleDetector) WeightSummary() string {
if e.method != EnsembleSEAD || e.sead == nil {
return ""
}
return e.sead.WeightSummary()
}
// appendHistory appends v to *h, evicting the oldest entry when full.
// Caller must hold e.mu.
func (e *EnsembleDetector) appendHistory(h *[]float64, v float64) {
*h = append(*h, v)
if len(*h) > e.historySize {
*h = (*h)[1:]
}
}
// methodString builds a concise label for AnomalyResult.Method.
func (e *EnsembleDetector) methodString(method string, copodAnomaly, rrcfAnomaly bool) string {
var active []string
if copodAnomaly {
active = append(active, "COPOD")
}
if rrcfAnomaly {
active = append(active, "RRCF")
}
if len(active) > 0 {
return fmt.Sprintf("Ensemble-%s(%s)", strings.ToUpper(method), strings.Join(active, "+"))
}
return fmt.Sprintf("Ensemble-%s(none)", strings.ToUpper(method))
}
// ── score helpers ─────────────────────────────────────────────────────────────
// minMaxNorm normalises v into [0, 1] using the observed min/max of history.
func minMaxNorm(v float64, history []float64) float64 {
if len(history) == 0 {
return 0
}
minV, maxV := history[0], history[0]
for _, h := range history[1:] {
if h < minV {
minV = h
}
if h > maxV {
maxV = h
}
}
spread := maxV - minV
if spread < 1e-12 {
return 0.5
}
norm := (v - minV) / spread
if norm < 0 {
return 0
}
if norm > 1 {
return 1
}
return norm
}
// quantile returns the p-th quantile of data without modifying the slice.
func quantile(data []float64, p float64) float64 {
n := len(data)
if n == 0 {
return 0
}
sorted := make([]float64, n)
copy(sorted, data)
sort.Float64s(sorted)
idx := int(float64(n) * p)
if idx >= n {
idx = n - 1
}
return sorted[idx]
}