200 lines
6 KiB
Go
200 lines
6 KiB
Go
package detect
|
||
|
||
import (
|
||
"log"
|
||
"sync"
|
||
|
||
"codeberg.org/pata1704/guenther/pkg/types"
|
||
"github.com/e-XpertSolutions/go-iforest/iforest"
|
||
)
|
||
|
||
// IsolationForestDetector wraps go-iforest with thread-safe access and
|
||
// continuous background retraining on non-anomalous data to handle concept drift.
|
||
//
|
||
// During the warmup phase (model == nil) incoming vectors are buffered.
|
||
// Once warmupSize vectors have accumulated, the first training run executes
|
||
// synchronously so that the detector is never in an undefined trained state
|
||
// after the first window tick.
|
||
//
|
||
// Subsequent retraining is asynchronous: when trainingBuffer reaches
|
||
// bufferSize the buffer is swapped out under the lock, and training runs in
|
||
// a detached goroutine. The current model remains active during retraining,
|
||
// so scoring never blocks.
|
||
type IsolationForestDetector struct {
|
||
mu sync.RWMutex
|
||
model *iforest.Forest
|
||
trainingBuffer []types.FeatureVector
|
||
|
||
// Tuning knobs – set via constructor.
|
||
numTrees int
|
||
subSample int
|
||
contamination float64
|
||
bufferSize int
|
||
warmupSize int
|
||
threshold float64
|
||
}
|
||
|
||
// NewIsolationForestDetector creates a detector with the given parameters.
|
||
//
|
||
// - bufferSize: number of non-anomalous vectors to accumulate before
|
||
// triggering background retraining.
|
||
// - warmupSize: number of vectors to accumulate before the first (sync)
|
||
// training run. Must be ≤ bufferSize.
|
||
// - numTrees: number of isolation trees (typically 100).
|
||
// - subSample: subsample size per tree (typically 256).
|
||
// - contamination: expected fraction of anomalies (0 < c < 0.5).
|
||
// - threshold: score cutoff for IsAnomaly.
|
||
func NewIsolationForestDetector(
|
||
bufferSize, warmupSize, numTrees, subSample int,
|
||
contamination, threshold float64,
|
||
) *IsolationForestDetector {
|
||
if warmupSize <= 0 || warmupSize > bufferSize {
|
||
warmupSize = bufferSize
|
||
}
|
||
return &IsolationForestDetector{
|
||
bufferSize: bufferSize,
|
||
warmupSize: warmupSize,
|
||
numTrees: numTrees,
|
||
subSample: subSample,
|
||
contamination: contamination,
|
||
threshold: threshold,
|
||
}
|
||
}
|
||
|
||
// Fit trains a new Isolation Forest on vectors.
|
||
// Fit is safe to call concurrently with Score (uses a write lock only while
|
||
// swapping the model pointer).
|
||
func (d *IsolationForestDetector) Fit(vectors []types.FeatureVector) error {
|
||
if len(vectors) == 0 {
|
||
return nil
|
||
}
|
||
|
||
data := convertToMatrix(vectors)
|
||
forest := iforest.NewForest(d.numTrees, d.subSample, d.contamination)
|
||
forest.Train(data)
|
||
forest.Test(data)
|
||
|
||
d.mu.Lock()
|
||
d.model = forest
|
||
d.mu.Unlock()
|
||
|
||
log.Printf("iforest: trained on %d samples (trees=%d, subsample=%d, contamination=%.3f)",
|
||
len(vectors), d.numTrees, d.subSample, d.contamination)
|
||
return nil
|
||
}
|
||
|
||
// Score returns an AnomalyResult for vector.
|
||
//
|
||
// Pre-model (warmup) behaviour:
|
||
// - Vector is appended to trainingBuffer.
|
||
// - Once warmupSize is reached the first training run executes synchronously
|
||
// on the calling goroutine so subsequent Score calls have a model.
|
||
// - Returns score=0, IsAnomaly=false while warming up.
|
||
//
|
||
// Post-model behaviour:
|
||
// - Score is computed via the active model (read-lock only).
|
||
// - Non-anomalous vectors are appended to trainingBuffer.
|
||
// - When trainingBuffer reaches bufferSize, a background retrain fires.
|
||
func (d *IsolationForestDetector) Score(vector types.FeatureVector) (types.AnomalyResult, error) {
|
||
warmup := types.AnomalyResult{
|
||
Timestamp: vector.Timestamp,
|
||
Score: 0,
|
||
IsAnomaly: false,
|
||
Method: "IF",
|
||
}
|
||
|
||
// ── warmup phase ──────────────────────────────────────────────────────
|
||
d.mu.RLock()
|
||
model := d.model
|
||
d.mu.RUnlock()
|
||
|
||
if model == nil {
|
||
d.mu.Lock()
|
||
d.trainingBuffer = append(d.trainingBuffer, vector)
|
||
bufLen := len(d.trainingBuffer)
|
||
d.mu.Unlock()
|
||
|
||
if bufLen < d.warmupSize {
|
||
return warmup, nil
|
||
}
|
||
|
||
// Synchronous first fit to eliminate the cold-start gap.
|
||
d.mu.Lock()
|
||
buf := d.trainingBuffer
|
||
d.trainingBuffer = nil
|
||
d.mu.Unlock()
|
||
|
||
if err := d.Fit(buf); err != nil {
|
||
return warmup, err
|
||
}
|
||
|
||
d.mu.RLock()
|
||
model = d.model
|
||
d.mu.RUnlock()
|
||
|
||
if model == nil {
|
||
return warmup, nil // Fit failed silently – defensive
|
||
}
|
||
}
|
||
|
||
// ── inference ─────────────────────────────────────────────────────────
|
||
_, scores, err := model.Predict([][]float64{vector.NormalizedVector})
|
||
if err != nil {
|
||
return warmup, err
|
||
}
|
||
if len(scores) == 0 {
|
||
return warmup, nil
|
||
}
|
||
score := scores[0]
|
||
|
||
res := types.AnomalyResult{
|
||
Timestamp: vector.Timestamp,
|
||
Score: score,
|
||
IsAnomaly: score > d.threshold,
|
||
Confidence: score,
|
||
Method: "IF",
|
||
}
|
||
|
||
// Buffer non-anomalous vectors for background retraining.
|
||
if !res.IsAnomaly {
|
||
if err := d.Update(vector); err != nil {
|
||
log.Printf("iforest: update buffer: %v", err)
|
||
}
|
||
}
|
||
return res, nil
|
||
}
|
||
|
||
// Update appends a non-anomalous vector to the training buffer.
|
||
// If the buffer is full it is swapped atomically and a background goroutine
|
||
// retrains the model on the captured data.
|
||
func (d *IsolationForestDetector) Update(vector types.FeatureVector) error {
|
||
d.mu.Lock()
|
||
d.trainingBuffer = append(d.trainingBuffer, vector)
|
||
|
||
if len(d.trainingBuffer) < d.bufferSize {
|
||
d.mu.Unlock()
|
||
return nil
|
||
}
|
||
|
||
buf := make([]types.FeatureVector, len(d.trainingBuffer))
|
||
copy(buf, d.trainingBuffer)
|
||
d.trainingBuffer = nil
|
||
d.mu.Unlock()
|
||
|
||
go func() {
|
||
if err := d.Fit(buf); err != nil {
|
||
log.Printf("iforest: background retrain: %v", err)
|
||
}
|
||
}()
|
||
return nil
|
||
}
|
||
|
||
// ── helpers ───────────────────────────────────────────────────────────────────
|
||
|
||
func convertToMatrix(vectors []types.FeatureVector) [][]float64 {
|
||
m := make([][]float64, len(vectors))
|
||
for i, v := range vectors {
|
||
m[i] = v.NormalizedVector
|
||
}
|
||
return m
|
||
}
|