325 lines
9.9 KiB
Go
325 lines
9.9 KiB
Go
// Package detect provides anomaly detection algorithms and ensemble logic.
|
|
package detect
|
|
|
|
import (
|
|
"fmt"
|
|
"math"
|
|
"sort"
|
|
"strings"
|
|
"sync"
|
|
|
|
"codeberg.org/pata1704/guenther/pkg/types"
|
|
)
|
|
|
|
// EnsembleMethod selects the score-aggregation strategy used by EnsembleDetector.
|
|
type EnsembleMethod string
|
|
|
|
const (
|
|
// EnsembleAVG combines normalised sub-scores by arithmetic mean.
|
|
EnsembleAVG EnsembleMethod = "avg"
|
|
// EnsembleMAX takes the maximum of the normalised sub-scores (aggressive).
|
|
EnsembleMAX EnsembleMethod = "max"
|
|
// EnsembleMEDIAN uses the median of normalised sub-scores (robust to outliers).
|
|
EnsembleMEDIAN EnsembleMethod = "median"
|
|
// EnsembleSEAD delegates to an embedded SEADDetector (adaptive MWU weights).
|
|
// This method is selected by setting detector.ensemble.method = "sead" in
|
|
// the config. The four base detectors (MAD, RRCF, COPOD, IForest) are
|
|
// instantiated with the same parameters as the non-SEAD ensemble paths and
|
|
// the SEAD wrapper handles the online weight updates automatically.
|
|
EnsembleSEAD EnsembleMethod = "sead"
|
|
)
|
|
|
|
// RRCFVariantConfig holds parameters for a single named RRCF instance in the
|
|
// SEAD multi-horizon ensemble.
|
|
type RRCFVariantConfig struct {
|
|
// NumTrees controls score stability: more trees → smoother / more conservative.
|
|
NumTrees int
|
|
// TreeSize is the sliding-window capacity per tree.
|
|
TreeSize int
|
|
// ThresholdPercentile is the per-model decision threshold for standalone use.
|
|
ThresholdPercentile float64
|
|
}
|
|
|
|
// RRCFVariantsConfig groups the three RRCF horizon variants used by the SEAD ensemble.
|
|
// - Fast: short memory, reactive to transient spikes
|
|
// - Mid: medium memory, balanced sensitivity
|
|
// - Slow: long memory, detects sustained / slow-drift events
|
|
type RRCFVariantsConfig struct {
|
|
Fast RRCFVariantConfig
|
|
Mid RRCFVariantConfig
|
|
Slow RRCFVariantConfig
|
|
}
|
|
|
|
// EnsembleDetector implements the AnomalyDetector interface by combining
|
|
// COPOD and RRCF scores using min-max normalisation.
|
|
//
|
|
// Scoring strategy (AVG / MAX / MEDIAN methods):
|
|
// 1. Each model produces a raw score on its own scale.
|
|
// 2. Both scores are normalised to [0, 1] using a rolling min/max window.
|
|
// 3. The combined score is the result of the selected aggregation function.
|
|
// 4. A window is flagged anomalous when combinedScore > threshold where
|
|
// threshold = quantile(combinedHistory, 1-contamination).
|
|
//
|
|
// SEAD method:
|
|
//
|
|
// When method == EnsembleSEAD the detector delegates entirely to an embedded
|
|
// SEADDetector which wraps all four base detectors and uses Multiplicative
|
|
// Weights Update (MWU/FTRL) to adapt weights online. The COPOD and RRCF
|
|
// sub-detectors passed to NewEnsembleDetector are still created but are only
|
|
// used when method != EnsembleSEAD.
|
|
type EnsembleDetector struct {
|
|
method EnsembleMethod
|
|
|
|
// sub-detectors for AVG/MAX/MEDIAN methods
|
|
copod AnomalyDetector
|
|
rrcf AnomalyDetector
|
|
|
|
// SEAD method: fully adaptive ensemble (replaces copod+rrcf when active)
|
|
sead *SEADDetector
|
|
|
|
contamination float64
|
|
|
|
mu sync.Mutex
|
|
copodHistory []float64
|
|
rrcfHistory []float64
|
|
combinedHistory []float64
|
|
historySize int
|
|
}
|
|
|
|
// NewEnsembleDetector initialises the multi-model ensemble.
|
|
//
|
|
// - method: "avg" | "max" | "median" | "sead"
|
|
// - copodBufferSize: sliding-window capacity for COPOD (≥ 100 recommended).
|
|
// - copodThreshold: per-model threshold passed to COPODDetector.
|
|
// - rrcfVariants: three-horizon RRCF config (fast/mid/slow). Used by SEAD;
|
|
// the Mid variant is also used for the classic AVG/MAX/MEDIAN path.
|
|
// - contamination: expected fraction of anomalies ∈ [0, 0.5).
|
|
// - seadCfg: SEAD parameters (only used when method == "sead").
|
|
// Pass detect.DefaultSEADConfig() when method != "sead".
|
|
func NewEnsembleDetector(
|
|
method EnsembleMethod,
|
|
copodBufferSize int, copodThreshold float64,
|
|
rrcfVariants RRCFVariantsConfig,
|
|
contamination float64,
|
|
seadCfg SEADConfig,
|
|
) (*EnsembleDetector, error) {
|
|
e := &EnsembleDetector{
|
|
method: method,
|
|
contamination: contamination,
|
|
historySize: 1000,
|
|
}
|
|
|
|
if method == EnsembleSEAD {
|
|
// Delegate to SEADDetector with all six base detectors (3 RRCF horizons).
|
|
// MAD is bootstrapped with identity priors (median=0, MAD=1); it will
|
|
// calibrate itself during the pipeline warm-up phase.
|
|
sead, err := NewSEADWithAllDetectors(
|
|
copodBufferSize, copodThreshold,
|
|
rrcfVariants,
|
|
3.5, 0, // madThreshold=3.5, madCalibSize=0→default 100 vectors
|
|
seadCfg,
|
|
)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("ensemble: sead: %w", err)
|
|
}
|
|
e.sead = sead
|
|
} else {
|
|
// Classic AVG/MAX/MEDIAN path: only COPOD + RRCF (Mid variant as default).
|
|
copodDet, err := NewCOPODDetector(copodBufferSize, copodThreshold)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("ensemble: %w", err)
|
|
}
|
|
e.copod = copodDet
|
|
// Use Mid variant defaults for the classic ensemble path.
|
|
midTrees := rrcfVariants.Mid.NumTrees
|
|
if midTrees == 0 {
|
|
midTrees = 150
|
|
}
|
|
midSize := rrcfVariants.Mid.TreeSize
|
|
if midSize == 0 {
|
|
midSize = 64
|
|
}
|
|
midPct := rrcfVariants.Mid.ThresholdPercentile
|
|
if midPct == 0 {
|
|
midPct = 0.85
|
|
}
|
|
e.rrcf = NewRRCFDetector(midTrees, midSize, 0, midPct)
|
|
}
|
|
|
|
return e, nil
|
|
}
|
|
|
|
// SEAD returns the underlying SEADDetector if the ensemble is in SEAD mode.
|
|
func (e *EnsembleDetector) SEAD() *SEADDetector {
|
|
e.mu.Lock()
|
|
defer e.mu.Unlock()
|
|
return e.sead
|
|
}
|
|
|
|
// Fit seeds the underlying models from a slice of feature vectors.
|
|
func (e *EnsembleDetector) Fit(vectors []types.FeatureVector) error {
|
|
if e.method == EnsembleSEAD {
|
|
return e.sead.Fit(vectors)
|
|
}
|
|
if err := e.copod.Fit(vectors); err != nil {
|
|
return fmt.Errorf("ensemble: fit copod: %w", err)
|
|
}
|
|
if err := e.rrcf.Fit(vectors); err != nil {
|
|
return fmt.Errorf("ensemble: fit rrcf: %w", err)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// Update propagates the vector to the underlying models.
|
|
func (e *EnsembleDetector) Update(vector types.FeatureVector) error {
|
|
if e.method == EnsembleSEAD {
|
|
return e.sead.Update(vector)
|
|
}
|
|
if err := e.copod.Update(vector); err != nil {
|
|
return fmt.Errorf("ensemble: update copod: %w", err)
|
|
}
|
|
if err := e.rrcf.Update(vector); err != nil {
|
|
return fmt.Errorf("ensemble: update rrcf: %w", err)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// Score evaluates the feature vector.
|
|
//
|
|
// For SEAD method: delegates entirely to the embedded SEADDetector.
|
|
// For AVG/MAX/MEDIAN: min-max normalises COPOD and RRCF scores and aggregates.
|
|
func (e *EnsembleDetector) Score(vector types.FeatureVector) (types.AnomalyResult, error) {
|
|
if e.method == EnsembleSEAD {
|
|
res, err := e.sead.Score(vector)
|
|
if err != nil {
|
|
return types.AnomalyResult{}, fmt.Errorf("ensemble: sead score: %w", err)
|
|
}
|
|
return res, nil
|
|
}
|
|
|
|
resCOPOD, err := e.copod.Score(vector)
|
|
if err != nil {
|
|
return types.AnomalyResult{}, fmt.Errorf("ensemble: score copod: %w", err)
|
|
}
|
|
|
|
resRRCF, err := e.rrcf.Score(vector)
|
|
if err != nil {
|
|
return types.AnomalyResult{}, fmt.Errorf("ensemble: score rrcf: %w", err)
|
|
}
|
|
|
|
e.mu.Lock()
|
|
defer e.mu.Unlock()
|
|
|
|
e.appendHistory(&e.copodHistory, resCOPOD.Score)
|
|
e.appendHistory(&e.rrcfHistory, resRRCF.Score)
|
|
|
|
normCOPOD := minMaxNorm(resCOPOD.Score, e.copodHistory)
|
|
normRRCF := minMaxNorm(resRRCF.Score, e.rrcfHistory)
|
|
|
|
var combined float64
|
|
switch e.method {
|
|
case EnsembleMAX:
|
|
combined = math.Max(normCOPOD, normRRCF)
|
|
case EnsembleMEDIAN:
|
|
// Median of two values = average; kept for future N>2 extension.
|
|
vals := []float64{normCOPOD, normRRCF}
|
|
sort.Float64s(vals)
|
|
combined = vals[len(vals)/2]
|
|
default: // EnsembleAVG
|
|
combined = (normCOPOD + normRRCF) / 2.0
|
|
}
|
|
|
|
e.appendHistory(&e.combinedHistory, combined)
|
|
|
|
const minDataPoints = 10
|
|
threshold := quantile(e.combinedHistory, 1.0-e.contamination)
|
|
isAnomaly := len(e.combinedHistory) > minDataPoints && combined > threshold
|
|
|
|
return types.AnomalyResult{
|
|
Timestamp: vector.Timestamp,
|
|
Score: combined,
|
|
IsAnomaly: isAnomaly,
|
|
Confidence: math.Min(combined/math.Max(threshold, 1e-9), 1.0),
|
|
Method: e.methodString(string(e.method), resCOPOD.IsAnomaly, resRRCF.IsAnomaly),
|
|
}, nil
|
|
}
|
|
|
|
// WeightSummary returns the current SEAD detector weights as a human-readable
|
|
// string. Returns "" when the ensemble is not using SEAD.
|
|
func (e *EnsembleDetector) WeightSummary() string {
|
|
if e.method != EnsembleSEAD || e.sead == nil {
|
|
return ""
|
|
}
|
|
return e.sead.WeightSummary()
|
|
}
|
|
|
|
// appendHistory appends v to *h, evicting the oldest entry when full.
|
|
// Caller must hold e.mu.
|
|
func (e *EnsembleDetector) appendHistory(h *[]float64, v float64) {
|
|
*h = append(*h, v)
|
|
if len(*h) > e.historySize {
|
|
*h = (*h)[1:]
|
|
}
|
|
}
|
|
|
|
// methodString builds a concise label for AnomalyResult.Method.
|
|
func (e *EnsembleDetector) methodString(method string, copodAnomaly, rrcfAnomaly bool) string {
|
|
var active []string
|
|
if copodAnomaly {
|
|
active = append(active, "COPOD")
|
|
}
|
|
if rrcfAnomaly {
|
|
active = append(active, "RRCF")
|
|
}
|
|
if len(active) > 0 {
|
|
return fmt.Sprintf("Ensemble-%s(%s)", strings.ToUpper(method), strings.Join(active, "+"))
|
|
}
|
|
return fmt.Sprintf("Ensemble-%s(none)", strings.ToUpper(method))
|
|
}
|
|
|
|
// ── score helpers ─────────────────────────────────────────────────────────────
|
|
|
|
// minMaxNorm normalises v into [0, 1] using the observed min/max of history.
|
|
func minMaxNorm(v float64, history []float64) float64 {
|
|
if len(history) == 0 {
|
|
return 0
|
|
}
|
|
minV, maxV := history[0], history[0]
|
|
for _, h := range history[1:] {
|
|
if h < minV {
|
|
minV = h
|
|
}
|
|
if h > maxV {
|
|
maxV = h
|
|
}
|
|
}
|
|
spread := maxV - minV
|
|
if spread < 1e-12 {
|
|
return 0.5
|
|
}
|
|
norm := (v - minV) / spread
|
|
if norm < 0 {
|
|
return 0
|
|
}
|
|
if norm > 1 {
|
|
return 1
|
|
}
|
|
return norm
|
|
}
|
|
|
|
// quantile returns the p-th quantile of data without modifying the slice.
|
|
func quantile(data []float64, p float64) float64 {
|
|
n := len(data)
|
|
if n == 0 {
|
|
return 0
|
|
}
|
|
sorted := make([]float64, n)
|
|
copy(sorted, data)
|
|
sort.Float64s(sorted)
|
|
|
|
idx := int(float64(n) * p)
|
|
if idx >= n {
|
|
idx = n - 1
|
|
}
|
|
return sorted[idx]
|
|
}
|