// Package detect provides anomaly detection algorithms and ensemble logic. package detect import ( "fmt" "log" "math" "sync" "codeberg.org/pata1704/guenther/pkg/types" "codeberg.org/pata1704/rrcf" ) // RRCFDetector wraps pkg/rrcf.Forest with the AnomalyDetector interface. // // Scoring strategy: score-then-insert (online streaming). // Each call to Score: // 1. Scores the point without inserting (ephemeral key – thread-safe). // 2. Inserts the point permanently so the forest stays fresh. type RRCFDetector struct { mu sync.Mutex forest *rrcf.Forest thresholdPct float64 numTrees int treeSize int warmup int counter int buf []types.FeatureVector // Rolling score window for adaptive threshold calculation. // Uses a FIFO ring buffer; only scores after warmupDiscard are included. scoreWindow *ringBuffer warmupDiscard int // number of scores to discard after forest initialisation scored int // total scores seen (including discarded) } // NewRRCFDetector creates an RRCFDetector. // // - numTrees: number of trees in the forest (200 recommended). // - treeSize: sliding-window capacity per tree (256 recommended). // - warmup: vectors to buffer before first Score (pass 0 for immediate start). // - thresholdPct: percentile of rolling score window used as threshold. // E.g. 0.65 means: flag as anomaly if score > 65th percentile of recent scores. // // Internal defaults: // - warmupDiscard = 10 (discard the first 10 scores; forest is not yet stable) // - scoreWindowMax = 60 func NewRRCFDetector(numTrees, treeSize, warmup int, thresholdPct float64) *RRCFDetector { return &RRCFDetector{ numTrees: numTrees, treeSize: treeSize, warmup: warmup, thresholdPct: thresholdPct, scoreWindow: newRingBuffer(60), warmupDiscard: 10, } } // Fit seeds the forest from a slice of FeatureVectors. // It replaces any existing forest; the internal insert counter is reset. func (d *RRCFDetector) Fit(vectors []types.FeatureVector) error { if len(vectors) == 0 { return nil } dim := len(vectors[0].NormalizedVector) d.mu.Lock() defer d.mu.Unlock() d.forest = rrcf.NewForest(d.numTrees, dim, d.treeSize) d.counter = 0 for _, v := range vectors { if err := d.forest.Insert(v.NormalizedVector, d.counter); err != nil { log.Printf("rrcf: fit insert: %v", err) continue } d.counter++ } log.Printf("rrcf: forest seeded with %d points (trees=%d, treeSize=%d)", len(vectors), d.numTrees, d.treeSize) return nil } // Score returns an AnomalyResult for vector. // During the warmup phase (len(buf) < warmup) the vector is buffered and a // zero-score result is returned. func (d *RRCFDetector) Score(vector types.FeatureVector) (types.AnomalyResult, error) { d.mu.Lock() defer d.mu.Unlock() // Lazy forest initialisation on the first Score call. if d.forest == nil { dim := len(vector.NormalizedVector) d.forest = rrcf.NewForest(d.numTrees, dim, d.treeSize) } // Warmup buffering. if d.warmup > 0 && len(d.buf) < d.warmup { d.buf = append(d.buf, vector) if len(d.buf) == d.warmup { for _, v := range d.buf { _ = d.forest.Insert(v.NormalizedVector, d.counter) d.counter++ } d.buf = nil log.Printf("rrcf: warmup complete (%d vectors)", d.warmup) } return types.AnomalyResult{ Timestamp: vector.Timestamp, Score: 0, IsAnomaly: false, Method: "RRCF", }, nil } // Score via ephemeral insertion. score, err := d.forest.Score(vector.NormalizedVector) if err != nil { return types.AnomalyResult{}, fmt.Errorf("rrcf: %w", err) } // Permanent streaming insert to keep the forest fresh. if err := d.forest.Insert(vector.NormalizedVector, d.counter); err != nil { log.Printf("rrcf: insert: %v", err) } d.counter++ d.scored++ // Discard the first warmupDiscard scores: the forest is still settling // and scores are artificially high, which would anchor the threshold. if d.scored <= d.warmupDiscard { return types.AnomalyResult{ Timestamp: vector.Timestamp, Score: score, IsAnomaly: false, Method: "RRCF", }, nil } // Update rolling score window (ring buffer). d.scoreWindow.push(score) // Need at least 10 scores before making decisions. isAnomaly := false var threshold float64 if d.scoreWindow.size >= 10 { threshold = d.rollingThreshold() isAnomaly = score > threshold } confidence := 0.0 if threshold > 1e-9 { confidence = math.Min(score/threshold, 1.0) } return types.AnomalyResult{ Timestamp: vector.Timestamp, Score: score, IsAnomaly: isAnomaly, Confidence: confidence, Method: "RRCF", }, nil } // rollingThreshold returns the thresholdPct-quantile of the rolling score window. // Caller must hold d.mu. func (d *RRCFDetector) rollingThreshold() float64 { return d.scoreWindow.quantileVal(d.thresholdPct) } // Update is a no-op for RRCF: insertion happens inside Score. func (d *RRCFDetector) Update(_ types.FeatureVector) error { return nil }