294 lines
8.1 KiB
Go
294 lines
8.1 KiB
Go
// Command pipeline is the entry point for the MFT anomaly detection pipeline.
|
||
//
|
||
// Startup order:
|
||
// 1. Load and compile config (masking patterns → *regexp.Regexp).
|
||
// 2. Allocate channels with fixed capacities to enable backpressure.
|
||
// 3. Start HealthMonitor.
|
||
// 4. Start collectors (MetricCollector, LogCollector).
|
||
// 5. Start TransformEngine (DuckDB, schema, pre-compiled query).
|
||
// 6. Start DetectionLayer.
|
||
// 7. Start anomaly sink goroutine.
|
||
// 8. Wait for SIGINT / SIGTERM.
|
||
// 9. Graceful shutdown in reverse order.
|
||
package main
|
||
|
||
import (
|
||
"bufio"
|
||
"context"
|
||
"encoding/json"
|
||
"flag"
|
||
"fmt"
|
||
"log"
|
||
"os"
|
||
"os/signal"
|
||
"sync"
|
||
"syscall"
|
||
"time"
|
||
|
||
"codeberg.org/pata1704/guenther/internal/collector"
|
||
"codeberg.org/pata1704/guenther/internal/config"
|
||
"codeberg.org/pata1704/guenther/internal/detect"
|
||
"codeberg.org/pata1704/guenther/internal/health"
|
||
"codeberg.org/pata1704/guenther/internal/transform"
|
||
"codeberg.org/pata1704/guenther/pkg/types"
|
||
)
|
||
|
||
func main() {
|
||
cfgPath := flag.String("config", "configs/default.yaml", "path to config file")
|
||
flag.Parse()
|
||
|
||
cfg, err := config.LoadConfig(*cfgPath)
|
||
if err != nil {
|
||
log.Fatalf("load config %q: %v", *cfgPath, err)
|
||
}
|
||
if err := cfg.Compile(); err != nil {
|
||
log.Fatalf("compile masking patterns: %v", err)
|
||
}
|
||
|
||
ctx, cancel := signal.NotifyContext(context.Background(), syscall.SIGINT, syscall.SIGTERM)
|
||
defer cancel()
|
||
|
||
logChan := make(chan types.LogEvent, 1_000)
|
||
metricChan := make(chan types.MetricSnapshot, 100)
|
||
serviceStatusChan := make(chan types.ServiceStatus, 100)
|
||
featureChan := make(chan types.FeatureVector, 10)
|
||
anomalyChan := make(chan types.AnomalyResult, 50)
|
||
|
||
hm := health.NewHealthMonitor()
|
||
hm.Start(ctx, 5*time.Second)
|
||
|
||
metricColl := collector.NewMetricCollector(
|
||
metricChan, hm.Chan(),
|
||
time.Second,
|
||
cfg.Ingestion.NetInterface,
|
||
cfg.Ingestion.DiskDevice,
|
||
)
|
||
logColl := collector.NewLogCollector(cfg, logChan, hm.Chan())
|
||
sysColl := collector.NewSystemctlCollector(
|
||
cfg.Ingestion.SystemctlServices,
|
||
5*time.Second,
|
||
serviceStatusChan,
|
||
hm.Chan(),
|
||
)
|
||
|
||
metricColl.Start(ctx)
|
||
if err := logColl.Start(ctx); err != nil {
|
||
log.Fatalf("start log collector: %v", err)
|
||
}
|
||
sysColl.Start(ctx)
|
||
|
||
engine, err := transform.NewTransformEngine(cfg, logChan, metricChan, serviceStatusChan, featureChan, hm.Chan())
|
||
if err != nil {
|
||
log.Fatalf("create transform engine: %v", err)
|
||
}
|
||
engine.Start(ctx)
|
||
|
||
detector, err := buildDetector(cfg)
|
||
if err != nil {
|
||
log.Fatalf("build detector: %v", err)
|
||
}
|
||
detLayer := detect.NewDetectionLayer(detector, featureChan, anomalyChan, hm.Chan())
|
||
|
||
if cfg.Detection.AutoScaling.Enabled {
|
||
if sd, ok := detector.(*detect.SwitchableDetector); ok {
|
||
sc := detect.NewScalingController(
|
||
sd,
|
||
cfg.Detection.AutoScaling.HighThreshold,
|
||
cfg.Detection.AutoScaling.CritThreshold,
|
||
cfg.Detection.AutoScaling.DownThreshold,
|
||
cfg.Detection.AutoScaling.HighDuration,
|
||
cfg.Detection.AutoScaling.CritDuration,
|
||
cfg.Detection.AutoScaling.DownDuration,
|
||
)
|
||
detLayer.SetScalingController(sc)
|
||
log.Println("detector: auto-scaling enabled")
|
||
} else {
|
||
log.Println("warning: auto-scaling requested but detector is not switchable (requires SEAD ensemble)")
|
||
}
|
||
}
|
||
|
||
detLayer.Start(ctx)
|
||
|
||
anomalyLog := openLog(cfg.Output.AnomalyLogPath, "anomaly log")
|
||
if anomalyLog != nil {
|
||
defer anomalyLog.Close()
|
||
}
|
||
anomalyWriter := maybeWriter(anomalyLog)
|
||
|
||
var sinkWg sync.WaitGroup
|
||
sinkWg.Add(1)
|
||
go func() {
|
||
defer sinkWg.Done()
|
||
for res := range anomalyChan {
|
||
writeJSON(anomalyWriter, res)
|
||
if res.IsAnomaly {
|
||
log.Printf("[ANOMALY] time=%s score=%.4f method=%s details=%s",
|
||
res.Timestamp.Format(time.RFC3339), res.Score, res.Method, res.Details)
|
||
}
|
||
}
|
||
}()
|
||
|
||
// Optionally log SEAD weights periodically (when using SEAD ensemble).
|
||
if ens, ok := detector.(*detect.EnsembleDetector); ok {
|
||
go func() {
|
||
t := time.NewTicker(60 * time.Second)
|
||
defer t.Stop()
|
||
for {
|
||
select {
|
||
case <-ctx.Done():
|
||
return
|
||
case <-t.C:
|
||
if ws := ens.WeightSummary(); ws != "" {
|
||
log.Printf("[SEAD weights] %s", ws)
|
||
}
|
||
}
|
||
}
|
||
}()
|
||
}
|
||
|
||
log.Println("pipeline started – waiting for SIGINT / SIGTERM")
|
||
<-ctx.Done()
|
||
log.Println("shutting down…")
|
||
|
||
metricColl.Wait()
|
||
logColl.Wait()
|
||
engine.Wait()
|
||
|
||
close(featureChan)
|
||
detLayer.Wait()
|
||
|
||
close(anomalyChan)
|
||
sinkWg.Wait()
|
||
|
||
hm.Wait()
|
||
log.Println("pipeline stopped")
|
||
}
|
||
|
||
// buildDetector constructs the configured AnomalyDetector.
|
||
//
|
||
// Routing:
|
||
// 1. detector.ensemble.enabled = true → EnsembleDetector with the method
|
||
// specified by detector.ensemble.method ("avg"|"max"|"median"|"sead").
|
||
// 2. Otherwise fall through to detector.method ("copod"|"rrcf"|"isolation_forest").
|
||
func buildDetector(cfg *config.Config) (detect.AnomalyDetector, error) {
|
||
if cfg.Detection.Ensemble.Enabled {
|
||
method := detect.EnsembleMethod(cfg.Detection.Ensemble.Method)
|
||
if method == "" {
|
||
method = detect.EnsembleAVG // backward-compat default
|
||
}
|
||
|
||
// Map SEAD config from YAML to detect.SEADConfig.
|
||
seadCfg := detect.SEADConfig{
|
||
Eta: cfg.Detection.Ensemble.SEAD.Eta,
|
||
Lambda: cfg.Detection.Ensemble.SEAD.Lambda,
|
||
QuantileWindow: cfg.Detection.Ensemble.SEAD.QuantileWindow,
|
||
MinDataPoints: cfg.Detection.Ensemble.SEAD.MinDataPoints,
|
||
Contamination: cfg.Detection.Ensemble.Contamination,
|
||
}
|
||
// Apply defaults for zero-value fields.
|
||
if seadCfg.Eta == 0 {
|
||
seadCfg.Eta = 0.10
|
||
}
|
||
if seadCfg.QuantileWindow == 0 {
|
||
seadCfg.QuantileWindow = 300
|
||
}
|
||
if seadCfg.MinDataPoints == 0 {
|
||
seadCfg.MinDataPoints = 20
|
||
}
|
||
|
||
det, err := detect.NewEnsembleDetector(
|
||
method,
|
||
cfg.Detection.COPOD.BufferSize,
|
||
cfg.Detection.COPOD.Threshold,
|
||
detect.RRCFVariantsConfig{
|
||
Fast: detect.RRCFVariantConfig{
|
||
NumTrees: cfg.Detection.RRCFVariants.Fast.NumTrees,
|
||
TreeSize: cfg.Detection.RRCFVariants.Fast.TreeSize,
|
||
ThresholdPercentile: cfg.Detection.RRCFVariants.Fast.ThresholdPercentile,
|
||
},
|
||
Mid: detect.RRCFVariantConfig{
|
||
NumTrees: cfg.Detection.RRCFVariants.Mid.NumTrees,
|
||
TreeSize: cfg.Detection.RRCFVariants.Mid.TreeSize,
|
||
ThresholdPercentile: cfg.Detection.RRCFVariants.Mid.ThresholdPercentile,
|
||
},
|
||
Slow: detect.RRCFVariantConfig{
|
||
NumTrees: cfg.Detection.RRCFVariants.Slow.NumTrees,
|
||
TreeSize: cfg.Detection.RRCFVariants.Slow.TreeSize,
|
||
ThresholdPercentile: cfg.Detection.RRCFVariants.Slow.ThresholdPercentile,
|
||
},
|
||
},
|
||
cfg.Detection.Ensemble.Contamination,
|
||
seadCfg,
|
||
)
|
||
if err != nil {
|
||
return nil, fmt.Errorf("build ensemble detector (%s): %w", method, err)
|
||
}
|
||
log.Printf("detector: Ensemble method=%s contamination=%.2f", method, cfg.Detection.Ensemble.Contamination)
|
||
if method == detect.EnsembleSEAD {
|
||
log.Printf("detector: SEAD η=%.3f λ=%.3f quantile_window=%d",
|
||
seadCfg.Eta, seadCfg.Lambda, seadCfg.QuantileWindow)
|
||
|
||
// Wrap in SwitchableDetector if using SEAD (required for 3-stage scaling).
|
||
if sead := det.SEAD(); sead != nil {
|
||
return detect.NewSwitchableDetector(sead), nil
|
||
}
|
||
}
|
||
return det, nil
|
||
}
|
||
|
||
switch cfg.Detection.Method {
|
||
case "copod":
|
||
return detect.NewCOPODDetector(
|
||
cfg.Detection.COPOD.BufferSize,
|
||
cfg.Detection.COPOD.Threshold,
|
||
)
|
||
case "rrcf":
|
||
return detect.NewRRCFDetector(
|
||
cfg.Detection.RRCF.NumTrees,
|
||
cfg.Detection.RRCF.TreeSize,
|
||
0,
|
||
cfg.Detection.RRCF.ThresholdPercentile,
|
||
), nil
|
||
default: // "isolation_forest"
|
||
return detect.NewIsolationForestDetector(
|
||
5_000, 100, 100, 256, 0.05, 10.0,
|
||
), nil
|
||
}
|
||
}
|
||
|
||
func openLog(path, label string) *os.File {
|
||
if path == "" {
|
||
return nil
|
||
}
|
||
f, err := os.OpenFile(path, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0o644)
|
||
if err != nil {
|
||
log.Printf("warning: cannot open %s %q: %v", label, path, err)
|
||
return nil
|
||
}
|
||
return f
|
||
}
|
||
|
||
func maybeWriter(f *os.File) *bufio.Writer {
|
||
if f == nil {
|
||
return nil
|
||
}
|
||
return bufio.NewWriterSize(f, 64*1024)
|
||
}
|
||
|
||
func writeJSON(w *bufio.Writer, v any) {
|
||
if w == nil {
|
||
return
|
||
}
|
||
b, err := json.Marshal(v)
|
||
if err != nil {
|
||
log.Printf("marshal: %v", err)
|
||
return
|
||
}
|
||
if _, err := w.Write(append(b, '\n')); err != nil {
|
||
log.Printf("write log: %v", err)
|
||
return
|
||
}
|
||
if err := w.Flush(); err != nil {
|
||
log.Printf("flush log: %v", err)
|
||
}
|
||
}
|