guenther/internal/detect/rrcf.go

173 lines
4.9 KiB
Go
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

// Package detect provides anomaly detection algorithms and ensemble logic.
package detect
import (
"fmt"
"log"
"math"
"sync"
"codeberg.org/pata1704/guenther/pkg/types"
"codeberg.org/pata1704/rrcf"
)
// RRCFDetector wraps pkg/rrcf.Forest with the AnomalyDetector interface.
//
// Scoring strategy: score-then-insert (online streaming).
// Each call to Score:
// 1. Scores the point without inserting (ephemeral key thread-safe).
// 2. Inserts the point permanently so the forest stays fresh.
type RRCFDetector struct {
mu sync.Mutex
forest *rrcf.Forest
thresholdPct float64
numTrees int
treeSize int
warmup int
counter int
buf []types.FeatureVector
// Rolling score window for adaptive threshold calculation.
// Uses a FIFO ring buffer; only scores after warmupDiscard are included.
scoreWindow *ringBuffer
warmupDiscard int // number of scores to discard after forest initialisation
scored int // total scores seen (including discarded)
}
// NewRRCFDetector creates an RRCFDetector.
//
// - numTrees: number of trees in the forest (200 recommended).
// - treeSize: sliding-window capacity per tree (256 recommended).
// - warmup: vectors to buffer before first Score (pass 0 for immediate start).
// - thresholdPct: percentile of rolling score window used as threshold.
// E.g. 0.65 means: flag as anomaly if score > 65th percentile of recent scores.
//
// Internal defaults:
// - warmupDiscard = 10 (discard the first 10 scores; forest is not yet stable)
// - scoreWindowMax = 60
func NewRRCFDetector(numTrees, treeSize, warmup int, thresholdPct float64) *RRCFDetector {
return &RRCFDetector{
numTrees: numTrees,
treeSize: treeSize,
warmup: warmup,
thresholdPct: thresholdPct,
scoreWindow: newRingBuffer(60),
warmupDiscard: 10,
}
}
// Fit seeds the forest from a slice of FeatureVectors.
// It replaces any existing forest; the internal insert counter is reset.
func (d *RRCFDetector) Fit(vectors []types.FeatureVector) error {
if len(vectors) == 0 {
return nil
}
dim := len(vectors[0].NormalizedVector)
d.mu.Lock()
defer d.mu.Unlock()
d.forest = rrcf.NewForest(d.numTrees, dim, d.treeSize)
d.counter = 0
for _, v := range vectors {
if err := d.forest.Insert(v.NormalizedVector, d.counter); err != nil {
log.Printf("rrcf: fit insert: %v", err)
continue
}
d.counter++
}
log.Printf("rrcf: forest seeded with %d points (trees=%d, treeSize=%d)",
len(vectors), d.numTrees, d.treeSize)
return nil
}
// Score returns an AnomalyResult for vector.
// During the warmup phase (len(buf) < warmup) the vector is buffered and a
// zero-score result is returned.
func (d *RRCFDetector) Score(vector types.FeatureVector) (types.AnomalyResult, error) {
d.mu.Lock()
defer d.mu.Unlock()
// Lazy forest initialisation on the first Score call.
if d.forest == nil {
dim := len(vector.NormalizedVector)
d.forest = rrcf.NewForest(d.numTrees, dim, d.treeSize)
}
// Warmup buffering.
if d.warmup > 0 && len(d.buf) < d.warmup {
d.buf = append(d.buf, vector)
if len(d.buf) == d.warmup {
for _, v := range d.buf {
_ = d.forest.Insert(v.NormalizedVector, d.counter)
d.counter++
}
d.buf = nil
log.Printf("rrcf: warmup complete (%d vectors)", d.warmup)
}
return types.AnomalyResult{
Timestamp: vector.Timestamp,
Score: 0,
IsAnomaly: false,
Method: "RRCF",
}, nil
}
// Score via ephemeral insertion.
score, err := d.forest.Score(vector.NormalizedVector)
if err != nil {
return types.AnomalyResult{}, fmt.Errorf("rrcf: %w", err)
}
// Permanent streaming insert to keep the forest fresh.
if err := d.forest.Insert(vector.NormalizedVector, d.counter); err != nil {
log.Printf("rrcf: insert: %v", err)
}
d.counter++
d.scored++
// Discard the first warmupDiscard scores: the forest is still settling
// and scores are artificially high, which would anchor the threshold.
if d.scored <= d.warmupDiscard {
return types.AnomalyResult{
Timestamp: vector.Timestamp,
Score: score,
IsAnomaly: false,
Method: "RRCF",
}, nil
}
// Update rolling score window (ring buffer).
d.scoreWindow.push(score)
// Need at least 10 scores before making decisions.
isAnomaly := false
var threshold float64
if d.scoreWindow.size >= 10 {
threshold = d.rollingThreshold()
isAnomaly = score > threshold
}
confidence := 0.0
if threshold > 1e-9 {
confidence = math.Min(score/threshold, 1.0)
}
return types.AnomalyResult{
Timestamp: vector.Timestamp,
Score: score,
IsAnomaly: isAnomaly,
Confidence: confidence,
Method: "RRCF",
}, nil
}
// rollingThreshold returns the thresholdPct-quantile of the rolling score window.
// Caller must hold d.mu.
func (d *RRCFDetector) rollingThreshold() float64 {
return d.scoreWindow.quantileVal(d.thresholdPct)
}
// Update is a no-op for RRCF: insertion happens inside Score.
func (d *RRCFDetector) Update(_ types.FeatureVector) error { return nil }