From 72635dc7b95cf4c8634bd57eb94808996868cf2c Mon Sep 17 00:00:00 2001 From: Patryk Hegenberg Date: Sun, 29 Mar 2026 10:03:18 +0200 Subject: [PATCH] commit for version used in evaluation of thesis --- Makefile | 50 ++ README.md | 212 ++++++ cmd/pipeline/main.go | 294 ++++++++ configs/default.yaml | 123 ++++ go.mod | 49 ++ go.sum | 125 ++++ internal/collector/log.go | 250 +++++++ internal/collector/log_test.go | 45 ++ internal/collector/metric.go | 542 ++++++++++++++ internal/collector/systemctl.go | 140 ++++ internal/config/config.go | 203 ++++++ internal/detect/copod.go | 98 +++ internal/detect/ensemble.go | 325 +++++++++ internal/detect/iforest.go | 200 ++++++ internal/detect/interface.go | 148 ++++ internal/detect/mad.go | 254 +++++++ internal/detect/mad_test.go | 114 +++ internal/detect/rrcf.go | 173 +++++ internal/detect/scaling.go | 299 ++++++++ internal/detect/sead.go | 507 ++++++++++++++ internal/detect/sead_test.go | 61 ++ internal/drain3/masking.go | 32 + internal/health/monitor.go | 111 +++ internal/transform/engine.go | 1091 +++++++++++++++++++++++++++++ internal/transform/engine_test.go | 106 +++ internal/transform/schema.go | 230 ++++++ pkg/types/types.go | 302 ++++++++ 27 files changed, 6084 insertions(+) create mode 100644 Makefile create mode 100644 README.md create mode 100644 cmd/pipeline/main.go create mode 100644 configs/default.yaml create mode 100644 go.mod create mode 100644 go.sum create mode 100644 internal/collector/log.go create mode 100644 internal/collector/log_test.go create mode 100644 internal/collector/metric.go create mode 100644 internal/collector/systemctl.go create mode 100644 internal/config/config.go create mode 100644 internal/detect/copod.go create mode 100644 internal/detect/ensemble.go create mode 100644 internal/detect/iforest.go create mode 100644 internal/detect/interface.go create mode 100644 internal/detect/mad.go create mode 100644 internal/detect/mad_test.go create mode 100644 internal/detect/rrcf.go create mode 100644 internal/detect/scaling.go create mode 100644 internal/detect/sead.go create mode 100644 internal/detect/sead_test.go create mode 100644 internal/drain3/masking.go create mode 100644 internal/health/monitor.go create mode 100644 internal/transform/engine.go create mode 100644 internal/transform/engine_test.go create mode 100644 internal/transform/schema.go create mode 100644 pkg/types/types.go diff --git a/Makefile b/Makefile new file mode 100644 index 0000000..2b209a7 --- /dev/null +++ b/Makefile @@ -0,0 +1,50 @@ +BINARY := guenther +BUILD_DIR := build +CMD := ./cmd/pipeline/main.go +CONFIG := configs/default.yaml + +GO_IMAGE := golang:bookworm +BUILD_TAGS := duckdb_arrow +LDFLAGS := -s -w + +GO_BUILD_FLAGS := -tags=$(BUILD_TAGS) -buildvcs=false -ldflags='$(LDFLAGS)' + +# ── Targets ─────────────────────────────────────────────────────────────────── + +.PHONY: all build build-local test clean run help + +all: build + +## build: Build the binary inside a Docker container (no local toolchain needed) +build: + @mkdir -p $(BUILD_DIR) + docker run --rm \ + -v $(PWD):/app:Z \ + -w /app \ + $(GO_IMAGE) \ + sh -c "apt-get update -qq && \ + apt-get install -y -qq gcc libc6-dev && \ + CGO_ENABLED=1 go build $(GO_BUILD_FLAGS) -o $(BUILD_DIR)/$(BINARY) $(CMD) && \ + echo BUILD_OK" \ + 2>&1 + +## build-local: Build the binary using the local Go toolchain (requires gcc) +build-local: + @mkdir -p $(BUILD_DIR) + CGO_ENABLED=1 go build $(GO_BUILD_FLAGS) -o $(BUILD_DIR)/$(BINARY) $(CMD) + +## test: Run all tests (requires local Go toolchain with gcc) +test: + CGO_ENABLED=1 go test -v -tags=$(BUILD_TAGS) ./... + +## run: Run the pipeline with the default config (binary must be built first) +run: $(BUILD_DIR)/$(BINARY) + ./$(BUILD_DIR)/$(BINARY) -config $(CONFIG) + +## clean: Remove build artefacts +clean: + rm -rf $(BUILD_DIR) + +## help: Show this help message +help: + @grep -E '^## ' $(MAKEFILE_LIST) | sed 's/^## / /' diff --git a/README.md b/README.md new file mode 100644 index 0000000..c4e1e2a --- /dev/null +++ b/README.md @@ -0,0 +1,212 @@ +# guenther + +A streaming anomaly detection pipeline for Managed-File-Transfer (MFT) infrastructure. +guenther ingests system metrics and application logs in real time, extracts structured +feature vectors per time window, and scores them with an ensemble of unsupervised +detectors — without any labelled training data. + +--- + +## How it works + +``` +┌─────────────────────────────────────────────────────────────┐ +│ Ingestion │ +│ MetricCollector (/proc) LogCollector (inotify + Drain3) │ +│ SystemctlCollector (service states) │ +└────────────────────┬────────────────────────────────────────┘ + │ channels (backpressure) +┌────────────────────▼────────────────────────────────────────┐ +│ Transformation │ +│ TransformEngine – 30 s tumbling windows via DuckDB │ +│ 45 base features + N Drain3 parameter aggregates │ +└────────────────────┬────────────────────────────────────────┘ + │ +┌────────────────────▼────────────────────────────────────────┐ +│ Detection │ +│ EnsembleDetector (RRCF fast/mid/slow · COPOD · MAD) │ +│ SEAD online weight adaptation · auto-scaling (3 stages) │ +└────────────────────┬────────────────────────────────────────┘ + │ + anomalies.jsonl +``` + +### Packages + +| Path | Responsibility | +| -------------------- | -------------------------------------------------------------------------------- | +| `cmd/pipeline` | Entry point, wiring, graceful shutdown | +| `internal/collector` | `MetricCollector` (`/proc`), `LogCollector` (inotify), `SystemctlCollector` | +| `internal/transform` | `TransformEngine` — DuckDB windowed aggregation | +| `internal/detect` | `EnsembleDetector`, RRCF, COPOD, MAD, IsolationForest, SEAD, `ScalingController` | +| `internal/drain3` | Masking / parameter extraction wrapper around Drain3 | +| `internal/config` | YAML config loading and regex compilation | +| `internal/health` | `HealthMonitor` — per-stage counters | +| `pkg/types` | Shared types: `LogEvent`, `MetricSnapshot`, `FeatureVector`, `AnomalyResult` | + +--- + +## Requirements + +| Dependency | Notes | +| --------------- | ------------------------------------------------------------ | +| Docker | Required for the containerised build (recommended) | +| Go ≥ 1.25 | Only needed for local builds | +| gcc / libc6-dev | CGO is required by `go-duckdb` | +| Linux | Metric collection reads `/proc`; not supported on other OSes | + +--- + +## Building + +### Docker (recommended — no local toolchain needed) + +```bash +make build +``` + +The binary is written to `build/guenther`. + +### Local (requires Go + gcc) + +```bash +make build-local +``` + +--- + +## Running + +```bash +./build/guenther -config configs/default.yaml +``` + +guenther shuts down cleanly on `SIGINT` or `SIGTERM`. + +--- + +## Testing + +```bash +make test +``` + +--- + +## Configuration + +guenther is configured via a single YAML file (default: `configs/default.yaml`). + +```yaml +ingestion: + log_path: "/path/to/log/file/transfer.log" # file to tail + net_interface: "ens4" # interface for /proc/net/dev + disk_device: "vda1" # device for /proc/diskstats + systemctl_services: + - service1.service + - service2.service + +transformation: + window_size: "30s" # tumbling window length + db_path: "data/pipeline.duckdb" # DuckDB file (use :memory: for ephemeral) + +drain: + depth: 4 + sim_threshold: 0.4 + max_children: 100 + max_clusters: 1000 + masking_patterns: # applied in order before template mining + - name: "uuid" + pattern: '\b[0-9a-fA-F]{8}-...\b' + replace: "" + type: "string" + # ... see configs/default.yaml for the full set + +detector: + method: "ensemble" # fallback when ensemble.enabled = false + ensemble: + enabled: true + method: "sead" # avg | max | median | sead + contamination: 0.15 + sead: + eta: 0.1 + lambda: 0.01 + auto_scaling: + enabled: true + high_threshold: 75.0 # CPU % → switch to mid detector + critical_threshold: 90.0 # CPU % → switch to fast detector + down_threshold: 50.0 + high_duration: 90.0 # seconds load must persist before scaling + critical_duration: 120.0 + down_duration: 120.0 + rrcf_variants: + fast: { num_trees: 50, tree_size: 32, threshold_percentile: 0.85 } + mid: { num_trees: 150, tree_size: 64, threshold_percentile: 0.85 } + slow: { num_trees: 200, tree_size: 128, threshold_percentile: 0.85 } + copod: + buffer_size: 50 + threshold: 0.3 + mad: + threshold: 3.5 + calibration_size: 50 + +output: + feature_log_path: "logs/features.jsonl" + anomaly_log_path: "logs/anomalies.jsonl" +``` + +### Masking pattern types + +Patterns with `type: float` extract a named parameter into `FeatureVector.ParamAvg`; +patterns with `type: string` replace the match in-place before template mining. +Named patterns (`name != ""`) are aggregated as features per window. + +--- + +## Output + +**`logs/anomalies.jsonl`** — one JSON object per scored window: + +```json +{ + "timestamp": "2026-01-15T14:32:00Z", + "score": 0.8721, + "is_anomaly": true, + "confidence": 0.91, + "method": "sead_ensemble", + "details": "rrcf_slow=0.91 copod=0.83 mad=0.78" +} +``` + +**`logs/features.jsonl`** — raw feature vectors for offline analysis (optional). + +--- + +## Project layout + +``` +guenther/ +├── cmd/ +│ └── pipeline/ +│ └── main.go +├── internal/ +│ ├── collector/ +│ ├── config/ +│ ├── detect/ +│ ├── drain3/ +│ ├── health/ +│ └── transform/ +├── pkg/ +│ └── types/ +├── configs/ +│ └── default.yaml +├── build/ # created by `make build` +├── Makefile +└── README.md +``` + +--- + +## License + +This project was developed as part of a Bachelor's thesis. diff --git a/cmd/pipeline/main.go b/cmd/pipeline/main.go new file mode 100644 index 0000000..d3d6eda --- /dev/null +++ b/cmd/pipeline/main.go @@ -0,0 +1,294 @@ +// Command pipeline is the entry point for the MFT anomaly detection pipeline. +// +// Startup order: +// 1. Load and compile config (masking patterns → *regexp.Regexp). +// 2. Allocate channels with fixed capacities to enable backpressure. +// 3. Start HealthMonitor. +// 4. Start collectors (MetricCollector, LogCollector). +// 5. Start TransformEngine (DuckDB, schema, pre-compiled query). +// 6. Start DetectionLayer. +// 7. Start anomaly sink goroutine. +// 8. Wait for SIGINT / SIGTERM. +// 9. Graceful shutdown in reverse order. +package main + +import ( + "bufio" + "context" + "encoding/json" + "flag" + "fmt" + "log" + "os" + "os/signal" + "sync" + "syscall" + "time" + + "codeberg.org/pata1704/guenther/internal/collector" + "codeberg.org/pata1704/guenther/internal/config" + "codeberg.org/pata1704/guenther/internal/detect" + "codeberg.org/pata1704/guenther/internal/health" + "codeberg.org/pata1704/guenther/internal/transform" + "codeberg.org/pata1704/guenther/pkg/types" +) + +func main() { + cfgPath := flag.String("config", "configs/default.yaml", "path to config file") + flag.Parse() + + cfg, err := config.LoadConfig(*cfgPath) + if err != nil { + log.Fatalf("load config %q: %v", *cfgPath, err) + } + if err := cfg.Compile(); err != nil { + log.Fatalf("compile masking patterns: %v", err) + } + + ctx, cancel := signal.NotifyContext(context.Background(), syscall.SIGINT, syscall.SIGTERM) + defer cancel() + + logChan := make(chan types.LogEvent, 1_000) + metricChan := make(chan types.MetricSnapshot, 100) + serviceStatusChan := make(chan types.ServiceStatus, 100) + featureChan := make(chan types.FeatureVector, 10) + anomalyChan := make(chan types.AnomalyResult, 50) + + hm := health.NewHealthMonitor() + hm.Start(ctx, 5*time.Second) + + metricColl := collector.NewMetricCollector( + metricChan, hm.Chan(), + time.Second, + cfg.Ingestion.NetInterface, + cfg.Ingestion.DiskDevice, + ) + logColl := collector.NewLogCollector(cfg, logChan, hm.Chan()) + sysColl := collector.NewSystemctlCollector( + cfg.Ingestion.SystemctlServices, + 5*time.Second, + serviceStatusChan, + hm.Chan(), + ) + + metricColl.Start(ctx) + if err := logColl.Start(ctx); err != nil { + log.Fatalf("start log collector: %v", err) + } + sysColl.Start(ctx) + + engine, err := transform.NewTransformEngine(cfg, logChan, metricChan, serviceStatusChan, featureChan, hm.Chan()) + if err != nil { + log.Fatalf("create transform engine: %v", err) + } + engine.Start(ctx) + + detector, err := buildDetector(cfg) + if err != nil { + log.Fatalf("build detector: %v", err) + } + detLayer := detect.NewDetectionLayer(detector, featureChan, anomalyChan, hm.Chan()) + + if cfg.Detection.AutoScaling.Enabled { + if sd, ok := detector.(*detect.SwitchableDetector); ok { + sc := detect.NewScalingController( + sd, + cfg.Detection.AutoScaling.HighThreshold, + cfg.Detection.AutoScaling.CritThreshold, + cfg.Detection.AutoScaling.DownThreshold, + cfg.Detection.AutoScaling.HighDuration, + cfg.Detection.AutoScaling.CritDuration, + cfg.Detection.AutoScaling.DownDuration, + ) + detLayer.SetScalingController(sc) + log.Println("detector: auto-scaling enabled") + } else { + log.Println("warning: auto-scaling requested but detector is not switchable (requires SEAD ensemble)") + } + } + + detLayer.Start(ctx) + + anomalyLog := openLog(cfg.Output.AnomalyLogPath, "anomaly log") + if anomalyLog != nil { + defer anomalyLog.Close() + } + anomalyWriter := maybeWriter(anomalyLog) + + var sinkWg sync.WaitGroup + sinkWg.Add(1) + go func() { + defer sinkWg.Done() + for res := range anomalyChan { + writeJSON(anomalyWriter, res) + if res.IsAnomaly { + log.Printf("[ANOMALY] time=%s score=%.4f method=%s details=%s", + res.Timestamp.Format(time.RFC3339), res.Score, res.Method, res.Details) + } + } + }() + + // Optionally log SEAD weights periodically (when using SEAD ensemble). + if ens, ok := detector.(*detect.EnsembleDetector); ok { + go func() { + t := time.NewTicker(60 * time.Second) + defer t.Stop() + for { + select { + case <-ctx.Done(): + return + case <-t.C: + if ws := ens.WeightSummary(); ws != "" { + log.Printf("[SEAD weights] %s", ws) + } + } + } + }() + } + + log.Println("pipeline started – waiting for SIGINT / SIGTERM") + <-ctx.Done() + log.Println("shutting down…") + + metricColl.Wait() + logColl.Wait() + engine.Wait() + + close(featureChan) + detLayer.Wait() + + close(anomalyChan) + sinkWg.Wait() + + hm.Wait() + log.Println("pipeline stopped") +} + +// buildDetector constructs the configured AnomalyDetector. +// +// Routing: +// 1. detector.ensemble.enabled = true → EnsembleDetector with the method +// specified by detector.ensemble.method ("avg"|"max"|"median"|"sead"). +// 2. Otherwise fall through to detector.method ("copod"|"rrcf"|"isolation_forest"). +func buildDetector(cfg *config.Config) (detect.AnomalyDetector, error) { + if cfg.Detection.Ensemble.Enabled { + method := detect.EnsembleMethod(cfg.Detection.Ensemble.Method) + if method == "" { + method = detect.EnsembleAVG // backward-compat default + } + + // Map SEAD config from YAML to detect.SEADConfig. + seadCfg := detect.SEADConfig{ + Eta: cfg.Detection.Ensemble.SEAD.Eta, + Lambda: cfg.Detection.Ensemble.SEAD.Lambda, + QuantileWindow: cfg.Detection.Ensemble.SEAD.QuantileWindow, + MinDataPoints: cfg.Detection.Ensemble.SEAD.MinDataPoints, + Contamination: cfg.Detection.Ensemble.Contamination, + } + // Apply defaults for zero-value fields. + if seadCfg.Eta == 0 { + seadCfg.Eta = 0.10 + } + if seadCfg.QuantileWindow == 0 { + seadCfg.QuantileWindow = 300 + } + if seadCfg.MinDataPoints == 0 { + seadCfg.MinDataPoints = 20 + } + + det, err := detect.NewEnsembleDetector( + method, + cfg.Detection.COPOD.BufferSize, + cfg.Detection.COPOD.Threshold, + detect.RRCFVariantsConfig{ + Fast: detect.RRCFVariantConfig{ + NumTrees: cfg.Detection.RRCFVariants.Fast.NumTrees, + TreeSize: cfg.Detection.RRCFVariants.Fast.TreeSize, + ThresholdPercentile: cfg.Detection.RRCFVariants.Fast.ThresholdPercentile, + }, + Mid: detect.RRCFVariantConfig{ + NumTrees: cfg.Detection.RRCFVariants.Mid.NumTrees, + TreeSize: cfg.Detection.RRCFVariants.Mid.TreeSize, + ThresholdPercentile: cfg.Detection.RRCFVariants.Mid.ThresholdPercentile, + }, + Slow: detect.RRCFVariantConfig{ + NumTrees: cfg.Detection.RRCFVariants.Slow.NumTrees, + TreeSize: cfg.Detection.RRCFVariants.Slow.TreeSize, + ThresholdPercentile: cfg.Detection.RRCFVariants.Slow.ThresholdPercentile, + }, + }, + cfg.Detection.Ensemble.Contamination, + seadCfg, + ) + if err != nil { + return nil, fmt.Errorf("build ensemble detector (%s): %w", method, err) + } + log.Printf("detector: Ensemble method=%s contamination=%.2f", method, cfg.Detection.Ensemble.Contamination) + if method == detect.EnsembleSEAD { + log.Printf("detector: SEAD η=%.3f λ=%.3f quantile_window=%d", + seadCfg.Eta, seadCfg.Lambda, seadCfg.QuantileWindow) + + // Wrap in SwitchableDetector if using SEAD (required for 3-stage scaling). + if sead := det.SEAD(); sead != nil { + return detect.NewSwitchableDetector(sead), nil + } + } + return det, nil + } + + switch cfg.Detection.Method { + case "copod": + return detect.NewCOPODDetector( + cfg.Detection.COPOD.BufferSize, + cfg.Detection.COPOD.Threshold, + ) + case "rrcf": + return detect.NewRRCFDetector( + cfg.Detection.RRCF.NumTrees, + cfg.Detection.RRCF.TreeSize, + 0, + cfg.Detection.RRCF.ThresholdPercentile, + ), nil + default: // "isolation_forest" + return detect.NewIsolationForestDetector( + 5_000, 100, 100, 256, 0.05, 10.0, + ), nil + } +} + +func openLog(path, label string) *os.File { + if path == "" { + return nil + } + f, err := os.OpenFile(path, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0o644) + if err != nil { + log.Printf("warning: cannot open %s %q: %v", label, path, err) + return nil + } + return f +} + +func maybeWriter(f *os.File) *bufio.Writer { + if f == nil { + return nil + } + return bufio.NewWriterSize(f, 64*1024) +} + +func writeJSON(w *bufio.Writer, v any) { + if w == nil { + return + } + b, err := json.Marshal(v) + if err != nil { + log.Printf("marshal: %v", err) + return + } + if _, err := w.Write(append(b, '\n')); err != nil { + log.Printf("write log: %v", err) + return + } + if err := w.Flush(); err != nil { + log.Printf("flush log: %v", err) + } +} diff --git a/configs/default.yaml b/configs/default.yaml new file mode 100644 index 0000000..e42c778 --- /dev/null +++ b/configs/default.yaml @@ -0,0 +1,123 @@ +ingestion: + log_path: "/path/to/log/file/transfer.log" + net_interface: "ens4" + disk_device: "vda1" + systemctl_services: + - service1.service + - service2.service + +transformation: + window_size: "30s" + db_path: "data/pipeline_test.duckdb" + +drain: + depth: 4 + sim_threshold: 0.4 + max_children: 100 + max_clusters: 1000 + masking_patterns: + - name: "loglevel" + pattern: '^(\S+)' + replace: "" + type: "string" + + - name: "" + pattern: '(\d{4}-\d{2}-\d{2})' + replace: "" + type: "string" + + - name: "" + pattern: '(\d{2}:\d{2}:\d{2}\.\d{6})' + replace: "