// Package transform contains the DuckDB-backed Tumbling Window Engine. package transform import ( "context" "database/sql" "encoding/json" "fmt" "log" "math" "strconv" "strings" "sync" "time" "codeberg.org/pata1704/guenther/internal/config" "codeberg.org/pata1704/guenther/pkg/types" "github.com/apache/arrow-go/v18/arrow" "github.com/apache/arrow-go/v18/arrow/array" "github.com/apache/arrow-go/v18/arrow/memory" "github.com/duckdb/duckdb-go/v2" ) // TransformEngine implements the DuckDB Tumbling-Window fusion layer. // // Architecture: // - raw_metrics (bronze layer): every MetricSnapshot field, 1 Hz rows // - log_events: parsed log lines with severity and parameters JSON // - log_params: numeric masking-pattern values as typed columns // - features: one row per window; feature_json for offline Python EDA // // Feature vector construction per window: // 1. DuckDB fusion query aggregates raw_metrics + log_events into window-level // raw aggregates (avg_cpu, sum_tcp_retrans, …) and sc_* columns (DuckDB // RobustScaler: (x-median)/IQR, fitted on baseline via FitScaler). // 2. processWindow computes engineered features (CPUDelta, NetDelta, // CPURollStd, CPUEfficiency, IOWaitProxy, LogDensity) in-process. // 3. NormalizedVector layout: // [0..22] sc_* DuckDB-scaled raw aggregates // [23..28] engineered features (appended unscaled; ratio/delta scale) // [29+] Drain param averages (if numeric patterns configured) // // Any change to NormalizedVector layout requires retraining all detectors. type TransformEngine struct { cfg *config.Config db *sql.DB conn *duckdb.Connector // kept for the Arrow zero-copy path logInputChan <-chan types.LogEvent metricInputChan <-chan types.MetricSnapshot serviceStatusInputChan <-chan types.ServiceStatus outputChan chan<- types.FeatureVector healthChan chan<- types.StageHealth windowSize time.Duration wg sync.WaitGroup mu sync.Mutex logBatch []types.LogEvent metricBatch []types.MetricSnapshot serviceStatusBatch []types.ServiceStatus // lastWindowStart prevents duplicate windows from being emitted when the // fusion query returns the same bucket twice within one ticker period. lastWindowStart time.Time pool memory.Allocator // Arrow allocator, reused across windows fusionQuery string paramNames []string featureInsertSQL string patternTypes map[string]string // Stateful buffers for feature engineering. // Only accessed from the single engine goroutine – no lock needed. cpuHistory []float64 // rolling window of avg_cpu, capacity 12 lastCPU float64 // avg_cpu from previous window lastNetOut float64 // avg_net_out from previous window lastNetIn float64 // avg_net_in from previous window (for DeltaNetIn) lastTCPRetrans float64 // sum_tcp_retrans from previous window (for DeltaTCPRetrans) lastCtxSwitches float64 // avg_ctx_switches from previous window (for DeltaCtx) tcpRetransHist []float64 // rolling window of sum_tcp_retrans, capacity 5 netOutHistory []float64 // rolling window of avg_net_out, capacity 5 // scalerFitted is set to true after FitScaler() has successfully written // parameters to scaler_params. When false the DuckDB scaler CTE falls back // to median=0/iqr=1 (identity transform) so the engine functions before // the baseline period has been recorded. scalerFitted bool processed uint64 // monotone window counter for health reporting avgLatency float64 // EWMA latency in ms for health reporting } // NewTransformEngine opens DuckDB, creates all tables, and pre-compiles queries. func NewTransformEngine( cfg *config.Config, logInput <-chan types.LogEvent, metricInput <-chan types.MetricSnapshot, serviceStatusInput <-chan types.ServiceStatus, output chan<- types.FeatureVector, health chan<- types.StageHealth, ) (*TransformEngine, error) { conn, err := duckdb.NewConnector(cfg.Transformation.DbPath, nil) if err != nil { return nil, fmt.Errorf("transform: open duckdb %q: %w", cfg.Transformation.DbPath, err) } db := sql.OpenDB(conn) if err := createSchema(db, cfg); err != nil { db.Close() return nil, err } windowInterval := fmt.Sprintf("%d seconds", int(cfg.Transformation.WindowSize.Seconds())) fusionQuery := BuildFusionQuery(cfg.Drain.MaskingPatterns, cfg.Ingestion.SystemctlServices, windowInterval) var paramNames []string patternTypes := make(map[string]string, len(cfg.Drain.MaskingPatterns)) for _, mp := range cfg.Drain.MaskingPatterns { if mp.Name != "" { paramNames = append(paramNames, mp.Name) patternTypes[mp.Name] = mp.Type } } return &TransformEngine{ cfg: cfg, db: db, conn: conn, logInputChan: logInput, metricInputChan: metricInput, serviceStatusInputChan: serviceStatusInput, outputChan: output, healthChan: health, windowSize: cfg.Transformation.WindowSize, pool: memory.NewGoAllocator(), fusionQuery: fusionQuery, paramNames: paramNames, patternTypes: patternTypes, featureInsertSQL: ` INSERT INTO features (window_start, feature_json) VALUES (?, ?) ON CONFLICT (window_start) DO UPDATE SET feature_json = excluded.feature_json`, }, nil } func createSchema(db *sql.DB, cfg *config.Config) error { stmts := []struct { sql string desc string }{ {"SET memory_limit = '512MB'", "set memory_limit"}, {"SET threads = 2", "set threads"}, {`CREATE TABLE IF NOT EXISTS raw_metrics ( timestamp TIMESTAMP WITH TIME ZONE, cpu_percent DOUBLE, cpu_iowait_percent DOUBLE, cpu_softirq_percent DOUBLE, context_switches_s DOUBLE, interrupts_s DOUBLE, memory_used_mb DOUBLE, memory_cached_mb DOUBLE, memory_dirty_mb DOUBLE, net_in_mbps DOUBLE, net_out_mbps DOUBLE, disk_read_mbps DOUBLE, disk_write_mbps DOUBLE, disk_read_time_s DOUBLE, disk_write_time_s DOUBLE, disk_io_ticks_s DOUBLE, network_errors_s DOUBLE, network_drops_s DOUBLE, tcp_retrans_s DOUBLE, tcp_timeouts_s DOUBLE, tcp_lost_retransmit_s DOUBLE, tcp_fast_retrans_s DOUBLE, softnet_dropped_s DOUBLE, softnet_time_squeeze_s DOUBLE, net_packets_in_s DOUBLE, net_packets_out_s DOUBLE, disk_reads_s DOUBLE, disk_writes_s DOUBLE )`, "create raw_metrics table"}, {`CREATE TABLE IF NOT EXISTS log_events ( timestamp TIMESTAMP WITH TIME ZONE, template_id INTEGER, severity VARCHAR, parameters JSON )`, "create log_events table"}, {`CREATE TABLE IF NOT EXISTS service_status ( timestamp TIMESTAMP WITH TIME ZONE, service_name VARCHAR, active_state VARCHAR, sub_state VARCHAR )`, "create service_status table"}, {BuildLogParamsSchema(cfg.Drain.MaskingPatterns), "create log_params table"}, // scaler_params: one row per feature; median (Q50) + IQR (Q75-Q25) fitted // from baseline data via FitScaler(). Queried inline by BuildFusionQuery. {BuildScalerParamsTable(), "create scaler_params table"}, // features: one row per window; anomaly_score is back-filled by the // detector via UpdateAnomalyScore after the detection stage runs. {`CREATE TABLE IF NOT EXISTS features ( window_start TIMESTAMP WITH TIME ZONE PRIMARY KEY, window_end TIMESTAMP WITH TIME ZONE, anomaly_score DOUBLE, feature_json JSON )`, "create features table"}, } for _, s := range stmts { if _, err := db.Exec(s.sql); err != nil { return fmt.Errorf("transform: %s: %w", s.desc, err) } } return nil } // ── lifecycle ───────────────────────────────────────────────────────────────── // Start launches the transform engine goroutine. // // Design: ingestion (log/metric channel reads) and control (ticker, ctx) are // handled in the SAME goroutine to avoid lock contention on the batches. // ctx.Done() is checked with priority via a non-blocking pre-select before the // blocking select, guaranteeing that the engine exits within one iteration of // cancellation regardless of channel pressure. func (e *TransformEngine) Start(ctx context.Context) { ticker := time.NewTicker(e.windowSize) reportTicker := time.NewTicker(5 * time.Second) e.wg.Add(1) go func() { defer e.wg.Done() defer ticker.Stop() defer reportTicker.Stop() for { // Priority exit: avoids starvation by busy input channels. select { case <-ctx.Done(): e.db.Close() return default: } select { case ev := <-e.logInputChan: e.mu.Lock() e.logBatch = append(e.logBatch, ev) e.mu.Unlock() case snap := <-e.metricInputChan: e.mu.Lock() e.metricBatch = append(e.metricBatch, snap) e.mu.Unlock() case stat := <-e.serviceStatusInputChan: e.mu.Lock() e.serviceStatusBatch = append(e.serviceStatusBatch, stat) e.mu.Unlock() case <-ticker.C: start := time.Now() e.processWindow() ms := time.Since(start).Seconds() * 1e3 e.mu.Lock() e.processed++ if e.avgLatency == 0 { e.avgLatency = ms } else { e.avgLatency = e.avgLatency*0.8 + ms*0.2 } e.mu.Unlock() case <-reportTicker.C: e.emitHealth() case <-ctx.Done(): e.db.Close() return } } }() } // Wait waits for the transform engine goroutine to exit after context cancellation. func (e *TransformEngine) Wait() { e.wg.Wait() } // FitScaler computes RobustScaler parameters (median Q50 + IQR Q75-Q25) from // raw_metrics and log_events rows in the half-open interval [fitStart, fitEnd) // and persists them to scaler_params. // // The RobustScaler uses median + IQR instead of mean + stddev to resist extreme // outliers in the baseline that would inflate the mean and push anomaly-phase z-scores // toward zero (as a StandardScaler would). This matches // sklearn.RobustScaler(quantile_range=(25, 75)). // // Call once after the baseline period and before the anomaly phase. After this, // BuildFusionQuery returns already-scaled sc_* values that populate // NormalizedVector slots 0–22. // // FitScaler is safe to call from any goroutine; it acquires no engine locks // because it only writes to DuckDB (which serialises writes internally). func (e *TransformEngine) FitScaler(fitStart, fitEnd time.Time) error { q := BuildFitScalerQuery() if _, err := e.db.Exec(q, fitStart, fitEnd); err != nil { return fmt.Errorf("transform: FitScaler [%s, %s): %w", fitStart.Format(time.RFC3339), fitEnd.Format(time.RFC3339), err) } e.mu.Lock() e.scalerFitted = true e.mu.Unlock() log.Printf("transform: FitScaler done – baseline [%s, %s)", fitStart.Format("15:04:05Z"), fitEnd.Format("15:04:05Z")) return nil } // ── window processing ───────────────────────────────────────────────────────── func (e *TransformEngine) processWindow() { e.mu.Lock() logs := e.logBatch metrics := e.metricBatch serviceStats := e.serviceStatusBatch e.logBatch = nil e.metricBatch = nil e.serviceStatusBatch = nil e.mu.Unlock() if len(metrics) == 0 && len(logs) == 0 && len(serviceStats) == 0 { return } if err := e.loadToDuckDB(logs, metrics, serviceStats); err != nil { log.Printf("transform: load: %v", err) return } fv, err := e.runFusionQuery() if err != nil { log.Printf("transform: fusion query: %v", err) return } if fv == nil || !fv.WindowStart.After(e.lastWindowStart) { return } e.lastWindowStart = fv.WindowStart fv.WindowEnd = fv.WindowStart.Add(e.windowSize) fv.Timestamp = time.Now() // ── Feature Engineering ─────────────────────────────────────────────────── // Temporal features capture cross-window dynamics not available from a // single-window aggregate. They are appended to NormalizedVector after // the DuckDB-scaled raw features. // // These features are NOT passed through the DuckDB RobustScaler because: // (a) CPUDelta / NetDelta are already on a %-point / MBps scale; the // detector needs the sign (positive = spike, negative = drop). // (b) CPURollStd, CPUEfficiency, IOWaitProxy, LogDensity are ratio/ // dimensionless features – their natural scale is already bounded. // Scaling them would require additional scaler_params rows and a second // CTE in BuildFusionQuery for marginal gain; fv.CPUDelta = fv.AvgCPUPercent - e.lastCPU fv.NetDelta = fv.AvgNetOutMBps - e.lastNetOut fv.DeltaCtx = fv.AvgCtxSwitches - e.lastCtxSwitches fv.DeltaNetIn = fv.AvgNetInMBps - e.lastNetIn fv.DeltaTCPRetrans = fv.SumTCPRetrans - e.lastTCPRetrans e.cpuHistory = append(e.cpuHistory, fv.AvgCPUPercent) if len(e.cpuHistory) > 12 { e.cpuHistory = e.cpuHistory[1:] } fv.CPURollStd = calcStdDev(e.cpuHistory) fv.CPUEfficiency = fv.AvgCPUPercent / (fv.AvgNetThroughput + 1.0) fv.CPUPerMB = fv.AvgCPUPercent / (fv.AvgNetThroughput + 1e-3) fv.NetworkDiskRatio = fv.AvgNetThroughput / (fv.AvgDiskReadMBps + fv.AvgDiskWriteMBps + 1e-3) fv.RetransPerMB = fv.SumTCPRetrans / (fv.AvgNetThroughput + 1e-3) fv.IOWaitProxy = fv.AvgDiskIOTicks / (fv.AvgCPUPercent + 1.0) fv.LogDensity = float64(fv.UniqueTemplates) / (float64(fv.LogCountTotal) + 1.0) e.tcpRetransHist = append(e.tcpRetransHist, fv.SumTCPRetrans) if len(e.tcpRetransHist) > 5 { e.tcpRetransHist = e.tcpRetransHist[1:] } fv.TcpRollStd = calcStdDev(e.tcpRetransHist) e.netOutHistory = append(e.netOutHistory, fv.AvgNetOutMBps) if len(e.netOutHistory) > 5 { e.netOutHistory = e.netOutHistory[1:] } fv.NetRollStd = calcStdDev(e.netOutHistory) fv.MemPressure = fv.MaxMemDirtyMB / (fv.AvgMemUsedMB + 1.0) fv.NetAsymmetry = fv.AvgNetInMBps / (fv.AvgNetOutMBps + 1e-3) e.lastCPU = fv.AvgCPUPercent e.lastNetOut = fv.AvgNetOutMBps e.lastNetIn = fv.AvgNetInMBps e.lastTCPRetrans = fv.SumTCPRetrans e.lastCtxSwitches = fv.AvgCtxSwitches // ── Final NormalizedVector Assembly ────────────────────────────────────── // sc_* base features (from DuckDB) are already at fv.NormalizedVector[0:len(scalerFeatureNames)] fv.NormalizedVector = append(fv.NormalizedVector, fv.CPUDelta, fv.CPURollStd, fv.CPUEfficiency, fv.DeltaCtx, fv.NetDelta, fv.AvgNetThroughput, fv.CPUPerMB, fv.NetworkDiskRatio, fv.RetransPerPacket, fv.RetransPerMB, fv.AvgDiskLatencyMS, float64(fv.LogCountTotal), float64(fv.UniqueTemplates), fv.LogDensity, fv.IOWaitProxy, fv.DeltaNetIn, fv.DeltaTCPRetrans, fv.TcpRollStd, fv.NetRollStd, fv.MemPressure, fv.NetAsymmetry, ) // Append service statuses (slots 35 to 35+N-1). // These are already encoded as 1/0/-1 and don't need RobustScaler. for _, svc := range e.cfg.Ingestion.SystemctlServices { safeName := strings.ReplaceAll(strings.ReplaceAll(svc, ".", "_"), "-", "_") val := fv.ServiceStatuses[safeName] fv.NormalizedVector = append(fv.NormalizedVector, val) } // Append param averages (slots 35+N onwards) for any numeric masking patterns. for _, name := range e.cfg.NumericPatternNames() { fv.NormalizedVector = append(fv.NormalizedVector, fv.ParamAvg[name]) } if err := e.persistFeatureVector(fv); err != nil { log.Printf("transform: persist feature vector: %v", err) } select { case e.outputChan <- *fv: default: log.Printf("transform: output channel full – dropping feature vector for window %s", fv.WindowStart.Format(time.RFC3339)) } } func (e *TransformEngine) loadToDuckDB(logs []types.LogEvent, metrics []types.MetricSnapshot, serviceStats []types.ServiceStatus) error { if len(metrics) > 0 { if err := e.loadMetricsArrow(metrics); err != nil { return fmt.Errorf("metrics: %w", err) } } if len(logs) > 0 { if err := e.loadLogs(logs); err != nil { return fmt.Errorf("logs: %w", err) } } if len(serviceStats) > 0 { if err := e.loadServiceStatus(serviceStats); err != nil { return fmt.Errorf("service status: %w", err) } } return nil } func (e *TransformEngine) loadServiceStatus(stats []types.ServiceStatus) error { tx, err := e.db.Begin() if err != nil { return err } defer tx.Rollback() stmt, err := tx.Prepare("INSERT INTO service_status (timestamp, service_name, active_state, sub_state) VALUES (?, ?, ?, ?)") if err != nil { return err } defer stmt.Close() for _, s := range stats { if _, err := stmt.Exec(s.Timestamp, s.ServiceName, s.ActiveState, s.SubState); err != nil { return err } } return tx.Commit() } // ── Arrow insert (raw_metrics) ──────────────────────────────────────────────── // loadMetricsArrow inserts MetricSnapshots into DuckDB via the Arrow // RegisterView + INSERT SELECT path (zero-copy on the Go side). // // The dedicated dbConn is closed before returning so DuckDB commits the rows // before the subsequent fusion query reads them. func (e *TransformEngine) loadMetricsArrow(metrics []types.MetricSnapshot) error { schema := arrow.NewSchema([]arrow.Field{ {Name: "timestamp", Type: arrow.FixedWidthTypes.Timestamp_ns}, {Name: "cpu_percent", Type: arrow.PrimitiveTypes.Float64}, {Name: "cpu_iowait_percent", Type: arrow.PrimitiveTypes.Float64}, {Name: "cpu_softirq_percent", Type: arrow.PrimitiveTypes.Float64}, {Name: "context_switches_s", Type: arrow.PrimitiveTypes.Float64}, {Name: "interrupts_s", Type: arrow.PrimitiveTypes.Float64}, {Name: "memory_used_mb", Type: arrow.PrimitiveTypes.Float64}, {Name: "memory_cached_mb", Type: arrow.PrimitiveTypes.Float64}, {Name: "memory_dirty_mb", Type: arrow.PrimitiveTypes.Float64}, {Name: "net_in_mbps", Type: arrow.PrimitiveTypes.Float64}, {Name: "net_out_mbps", Type: arrow.PrimitiveTypes.Float64}, {Name: "disk_read_mbps", Type: arrow.PrimitiveTypes.Float64}, {Name: "disk_write_mbps", Type: arrow.PrimitiveTypes.Float64}, {Name: "disk_read_time_s", Type: arrow.PrimitiveTypes.Float64}, {Name: "disk_write_time_s", Type: arrow.PrimitiveTypes.Float64}, {Name: "disk_io_ticks_s", Type: arrow.PrimitiveTypes.Float64}, {Name: "network_errors_s", Type: arrow.PrimitiveTypes.Float64}, {Name: "network_drops_s", Type: arrow.PrimitiveTypes.Float64}, {Name: "tcp_retrans_s", Type: arrow.PrimitiveTypes.Float64}, {Name: "tcp_timeouts_s", Type: arrow.PrimitiveTypes.Float64}, {Name: "tcp_lost_retransmit_s", Type: arrow.PrimitiveTypes.Float64}, {Name: "tcp_fast_retrans_s", Type: arrow.PrimitiveTypes.Float64}, {Name: "softnet_dropped_s", Type: arrow.PrimitiveTypes.Float64}, {Name: "softnet_time_squeeze_s", Type: arrow.PrimitiveTypes.Float64}, {Name: "net_packets_in_s", Type: arrow.PrimitiveTypes.Float64}, {Name: "net_packets_out_s", Type: arrow.PrimitiveTypes.Float64}, {Name: "disk_reads_s", Type: arrow.PrimitiveTypes.Float64}, {Name: "disk_writes_s", Type: arrow.PrimitiveTypes.Float64}, }, nil) b := array.NewRecordBuilder(e.pool, schema) defer b.Release() tsb := b.Field(0).(*array.TimestampBuilder) cpuF := b.Field(1).(*array.Float64Builder) iowait := b.Field(2).(*array.Float64Builder) softirq := b.Field(3).(*array.Float64Builder) ctxt := b.Field(4).(*array.Float64Builder) intr := b.Field(5).(*array.Float64Builder) memUsed := b.Field(6).(*array.Float64Builder) memCached := b.Field(7).(*array.Float64Builder) memDirty := b.Field(8).(*array.Float64Builder) netIn := b.Field(9).(*array.Float64Builder) netOut := b.Field(10).(*array.Float64Builder) diskRead := b.Field(11).(*array.Float64Builder) diskWrite := b.Field(12).(*array.Float64Builder) diskRTime := b.Field(13).(*array.Float64Builder) diskWTime := b.Field(14).(*array.Float64Builder) diskTicks := b.Field(15).(*array.Float64Builder) netErr := b.Field(16).(*array.Float64Builder) netDrop := b.Field(17).(*array.Float64Builder) tcpRetrans := b.Field(18).(*array.Float64Builder) tcpTimeout := b.Field(19).(*array.Float64Builder) tcpLost := b.Field(20).(*array.Float64Builder) tcpFast := b.Field(21).(*array.Float64Builder) softDrop := b.Field(22).(*array.Float64Builder) softSqueeze := b.Field(23).(*array.Float64Builder) pIn := b.Field(24).(*array.Float64Builder) pOut := b.Field(25).(*array.Float64Builder) dRComp := b.Field(26).(*array.Float64Builder) dWComp := b.Field(27).(*array.Float64Builder) for i := range metrics { m := &metrics[i] tsb.Append(arrow.Timestamp(m.Timestamp.UnixNano())) cpuF.Append(m.CPUPercent) iowait.Append(m.CPUIoWaitPercent) softirq.Append(m.CPUSoftIrqPercent) ctxt.Append(m.ContextSwitchesPerS) intr.Append(m.InterruptsPerS) memUsed.Append(m.MemoryUsedMB) memCached.Append(m.MemoryCachedMB) memDirty.Append(m.MemoryDirtyMB) netIn.Append(m.NetworkInMBps) netOut.Append(m.NetworkOutMBps) diskRead.Append(m.DiskReadMBps) diskWrite.Append(m.DiskWriteMBps) diskRTime.Append(m.DiskReadTimeMsPerS) diskWTime.Append(m.DiskWriteTimeMsPerS) diskTicks.Append(m.DiskIOTicksPerS) netErr.Append(m.NetErrorsPerS) netDrop.Append(m.NetDropsPerS) tcpRetrans.Append(m.TCPRetransPerS) tcpTimeout.Append(m.TCPTimeoutsPerS) tcpLost.Append(m.TCPLostRetransmitPerS) tcpFast.Append(m.TCPFastRetransPerS) softDrop.Append(m.SoftnetDroppedPerS) softSqueeze.Append(m.SoftnetTimeSqueezePerS) pIn.Append(m.NetPacketsInPerS) pOut.Append(m.NetPacketsOutPerS) dRComp.Append(m.DiskReadsCompletedPerS) dWComp.Append(m.DiskWritesCompletedPerS) } rec := b.NewRecordBatch() defer rec.Release() dbConn, err := e.conn.Connect(context.Background()) if err != nil { return fmt.Errorf("connect: %w", err) } var connErr error defer func() { if cerr := dbConn.Close(); cerr != nil && connErr == nil { connErr = fmt.Errorf("close arrow conn: %w", cerr) } }() arr, err := duckdb.NewArrowFromConn(dbConn) if err != nil { return fmt.Errorf("create arrow interface: %w", err) } rdr, err := array.NewRecordReader(schema, []arrow.RecordBatch{rec}) if err != nil { return fmt.Errorf("create record reader: %w", err) } defer rdr.Release() releaseView, err := arr.RegisterView(rdr, "temp_metrics_arrow_view") if err != nil { return fmt.Errorf("register arrow view: %w", err) } defer releaseView() res, err := arr.QueryContext(context.Background(), "INSERT INTO raw_metrics SELECT * FROM temp_metrics_arrow_view") if err != nil { return fmt.Errorf("insert from arrow view: %w", err) } if res != nil { res.Release() } return connErr } // ── SQL insert (log_events + log_params) ────────────────────────────────────── func (e *TransformEngine) loadLogs(logs []types.LogEvent) error { if len(logs) == 0 { return nil } if err := e.loadLogEventsArrow(logs); err != nil { return fmt.Errorf("load log events arrow: %w", err) } if len(e.paramNames) > 0 { if err := e.loadLogParamsArrow(logs); err != nil { return fmt.Errorf("load log params arrow: %w", err) } } return nil } func (e *TransformEngine) loadLogEventsArrow(logs []types.LogEvent) error { schema := arrow.NewSchema([]arrow.Field{ {Name: "timestamp", Type: arrow.FixedWidthTypes.Timestamp_ns}, {Name: "template_id", Type: arrow.PrimitiveTypes.Int32}, {Name: "severity", Type: arrow.BinaryTypes.String}, {Name: "parameters", Type: arrow.BinaryTypes.String}, }, nil) b := array.NewRecordBuilder(e.pool, schema) defer b.Release() tsb := b.Field(0).(*array.TimestampBuilder) tid := b.Field(1).(*array.Int32Builder) sev := b.Field(2).(*array.StringBuilder) prm := b.Field(3).(*array.StringBuilder) for i := range logs { l := &logs[i] tsb.Append(arrow.Timestamp(l.Timestamp.UnixNano())) tid.Append(int32(l.TemplateID)) sev.Append(l.Severity) paramJSON, jerr := json.Marshal(l.Params) if jerr != nil { prm.Append("{}") } else { prm.Append(string(paramJSON)) } } rec := b.NewRecordBatch() defer rec.Release() return e.insertArrowRecord(schema, rec, "temp_logs_arrow_view", "INSERT INTO log_events SELECT * FROM temp_logs_arrow_view") } func (e *TransformEngine) loadLogParamsArrow(logs []types.LogEvent) error { fields := []arrow.Field{ {Name: "event_time", Type: arrow.FixedWidthTypes.Timestamp_ns}, } for _, name := range e.paramNames { var typ arrow.DataType = arrow.BinaryTypes.String switch e.patternTypes[name] { case "float": typ = arrow.PrimitiveTypes.Float64 case "int": typ = arrow.PrimitiveTypes.Int64 } fields = append(fields, arrow.Field{Name: "param_" + name, Type: typ}) } schema := arrow.NewSchema(fields, nil) b := array.NewRecordBuilder(e.pool, schema) defer b.Release() tsb := b.Field(0).(*array.TimestampBuilder) rowCount := 0 for i := range logs { l := &logs[i] if len(l.Params) == 0 { continue } // Skip log events that carry none of the configured param names. hasValue := false for _, name := range e.paramNames { if _, ok := l.Params[name]; ok { hasValue = true break } } if !hasValue { continue } tsb.Append(arrow.Timestamp(l.Timestamp.UnixNano())) for j, name := range e.paramNames { raw, ok := l.Params[name] if !ok { b.Field(j + 1).AppendNull() continue } raw = strings.TrimSpace(raw) switch e.patternTypes[name] { case "float": f, err := strconv.ParseFloat(raw, 64) if err == nil { b.Field(j + 1).(*array.Float64Builder).Append(f) } else { log.Printf("transform: cannot parse float param %q=%q", name, raw) b.Field(j + 1).AppendNull() } case "int": iv, err := strconv.ParseInt(raw, 10, 64) if err == nil { b.Field(j + 1).(*array.Int64Builder).Append(iv) } else { log.Printf("transform: cannot parse int param %q=%q", name, raw) b.Field(j + 1).AppendNull() } default: b.Field(j + 1).(*array.StringBuilder).Append(raw) } } rowCount++ } if rowCount == 0 { return nil } rec := b.NewRecordBatch() defer rec.Release() cols := make([]string, 0, 1+len(e.paramNames)) cols = append(cols, "event_time") for _, name := range e.paramNames { cols = append(cols, "param_"+name) } query := fmt.Sprintf( "INSERT INTO log_params (%s) SELECT * FROM temp_params_arrow_view", strings.Join(cols, ", "), ) return e.insertArrowRecord(schema, rec, "temp_params_arrow_view", query) } // insertArrowRecord registers rec as a DuckDB view named viewName and executes // insertQuery against it. The connection is closed before returning to ensure // DuckDB commits the rows prior to the next read. func (e *TransformEngine) insertArrowRecord( schema *arrow.Schema, rec arrow.RecordBatch, viewName, insertQuery string, ) error { dbConn, err := e.conn.Connect(context.Background()) if err != nil { return fmt.Errorf("connect: %w", err) } var connErr error defer func() { if cerr := dbConn.Close(); cerr != nil && connErr == nil { connErr = fmt.Errorf("close arrow conn: %w", cerr) } }() arr, err := duckdb.NewArrowFromConn(dbConn) if err != nil { return fmt.Errorf("create arrow interface: %w", err) } rdr, err := array.NewRecordReader(schema, []arrow.RecordBatch{rec}) if err != nil { return fmt.Errorf("create record reader: %w", err) } defer rdr.Release() releaseView, err := arr.RegisterView(rdr, viewName) if err != nil { return fmt.Errorf("register arrow view %s: %w", viewName, err) } defer releaseView() res, err := arr.QueryContext(context.Background(), insertQuery) if err != nil { return fmt.Errorf("insert from arrow view %s: %w", viewName, err) } if res != nil { res.Release() } return connErr } // ── fusion query ────────────────────────────────────────────────────────────── // runFusionQuery executes the pre-compiled fusion query and maps the result // row into a FeatureVector. // // Slots populated here: // - struct fields (raw aggregates) for feature engineering and feature_json // - NormalizedVector[0..22]: sc_* DuckDB RobustScaler output // - ParamAvg map: avg_param_* columns // // Slots 23+ of NormalizedVector are populated by processWindow after this call. func (e *TransformEngine) runFusionQuery() (*types.FeatureVector, error) { rows, err := e.db.Query(e.fusionQuery) if err != nil { return nil, fmt.Errorf("execute: %w", err) } defer rows.Close() if !rows.Next() { return nil, rows.Err() } cols, err := rows.Columns() if err != nil { return nil, fmt.Errorf("columns: %w", err) } dest := make([]any, len(cols)) ptrs := make([]any, len(cols)) for i := range dest { ptrs[i] = &dest[i] } if err := rows.Scan(ptrs...); err != nil { return nil, fmt.Errorf("scan: %w", err) } fv := &types.FeatureVector{ ParamAvg: make(map[string]float64), ServiceStatuses: make(map[string]float64), } // Additional raw sums for derived features var sumDiskRTime, sumDiskWTime, sumDiskReads, sumDiskWrites, sumPacketsOut float64 for i, col := range cols { if dest[i] == nil { continue } switch col { case "ws": if t, ok := dest[i].(time.Time); ok { fv.WindowStart = t } case "avg_cpu": fv.AvgCPUPercent = toFloat64(dest[i]) case "max_cpu": fv.MaxCPUPercent = toFloat64(dest[i]) case "std_cpu": fv.StdCPUPercent = toFloat64(dest[i]) case "avg_iowait": fv.AvgCPUIoWait = toFloat64(dest[i]) case "std_iowait": fv.StdCPUIoWait = toFloat64(dest[i]) case "avg_softirq": fv.AvgCPUSoftIrq = toFloat64(dest[i]) case "avg_ctx_switches": fv.AvgCtxSwitches = toFloat64(dest[i]) case "avg_interrupts": fv.AvgInterrupts = toFloat64(dest[i]) case "avg_softnet_dropped": fv.AvgSoftnetDropped = toFloat64(dest[i]) case "avg_softnet_squeeze": fv.AvgSoftnetSqueeze = toFloat64(dest[i]) case "avg_mem_used": fv.AvgMemUsedMB = toFloat64(dest[i]) case "avg_mem_cached": fv.AvgMemCachedMB = toFloat64(dest[i]) case "max_mem_dirty": fv.MaxMemDirtyMB = toFloat64(dest[i]) case "avg_net_in": fv.AvgNetInMBps = toFloat64(dest[i]) case "std_net_in": fv.StdNetInMBps = toFloat64(dest[i]) case "avg_net_out": fv.AvgNetOutMBps = toFloat64(dest[i]) case "std_net_out": fv.StdNetOutMBps = toFloat64(dest[i]) case "avg_net_drops": fv.AvgNetDrops = toFloat64(dest[i]) case "sum_tcp_retrans": fv.SumTCPRetrans = toFloat64(dest[i]) case "sum_tcp_fast_retrans": fv.SumTCPFastRetrans = toFloat64(dest[i]) case "sum_tcp_timeouts": fv.SumTCPTimeouts = toFloat64(dest[i]) case "avg_disk_read": fv.AvgDiskReadMBps = toFloat64(dest[i]) case "avg_disk_write": fv.AvgDiskWriteMBps = toFloat64(dest[i]) case "avg_disk_io_ticks": fv.AvgDiskIOTicks = toFloat64(dest[i]) case "std_disk_io_ticks": fv.StdDiskIOTicks = toFloat64(dest[i]) case "error_count": fv.ErrorCount = int(toInt64(dest[i])) case "severity_score": fv.SeverityScore = toFloat64(dest[i]) case "log_event_count": fv.LogCountTotal = int(toInt64(dest[i])) case "unique_templates": fv.UniqueTemplates = int(toInt64(dest[i])) case "sum_disk_read_time": sumDiskRTime = toFloat64(dest[i]) case "sum_disk_write_time": sumDiskWTime = toFloat64(dest[i]) case "sum_disk_reads": sumDiskReads = toFloat64(dest[i]) case "sum_disk_writes": sumDiskWrites = toFloat64(dest[i]) case "sum_packets_out": sumPacketsOut = toFloat64(dest[i]) default: if strings.HasPrefix(col, "avg_param_") { name := strings.TrimPrefix(col, "avg_param_") fv.ParamAvg[name] = toFloat64(dest[i]) } if strings.HasPrefix(col, "svc_") { name := strings.TrimPrefix(col, "svc_") fv.ServiceStatuses[name] = toFloat64(dest[i]) } } } // Derived calculations from raw sums fv.AvgNetThroughput = fv.AvgNetInMBps + fv.AvgNetOutMBps fv.AvgDiskLatencyMS = (sumDiskRTime + sumDiskWTime) / (sumDiskReads + sumDiskWrites + 1e-3) fv.RetransPerPacket = fv.SumTCPRetrans / (sumPacketsOut + 1e-3) // Build NormalizedVector from sc_* columns in canonical feature order. scaledMap := make(map[string]float64) for i, col := range cols { if dest[i] != nil && strings.HasPrefix(col, "sc_") { scaledMap[col] = toFloat64(dest[i]) } } scOrder := make([]string, 0, len(scalerFeatureNames)) for _, name := range scalerFeatureNames { scOrder = append(scOrder, "sc_"+name) } scaled := make([]float64, len(scOrder)) for i, key := range scOrder { scaled[i] = scaledMap[key] } fv.NormalizedVector = scaled return fv, nil } // ── persistence ─────────────────────────────────────────────────────────────── func (e *TransformEngine) persistFeatureVector(fv *types.FeatureVector) error { featureMap := fv.ToNamedMap(e.cfg.NumericPatternNames()) jsonBytes, err := json.Marshal(featureMap) if err != nil { return fmt.Errorf("marshal feature json: %w", err) } if _, err := e.db.Exec(e.featureInsertSQL, fv.WindowStart, string(jsonBytes)); err != nil { return fmt.Errorf("insert features row: %w", err) } return nil } // UpdateAnomalyScore back-fills the anomaly score for a completed window. // Called by the detection stage after scoring the FeatureVector. func (e *TransformEngine) UpdateAnomalyScore(window time.Time, score float64) { const q = `UPDATE features SET anomaly_score = ? WHERE window_start = ?` if _, err := e.db.Exec(q, score, window); err != nil { log.Printf("transform: UpdateAnomalyScore window=%s: %v", window.Format(time.RFC3339), err) } } // ── health ──────────────────────────────────────────────────────────────────── func (e *TransformEngine) emitHealth() { e.mu.Lock() p := e.processed l := e.avgLatency e.mu.Unlock() select { case e.healthChan <- types.StageHealth{ StageName: "transform_engine", EventsProcessed: p, AvgLatencyMs: l, LastUpdate: time.Now(), }: default: } } // ── type helpers ────────────────────────────────────────────────────────────── // toFloat64 converts DuckDB result values to float64 for field assignment. // Handles the concrete types returned by the duckdb-go driver. // Returns 0 for any unsupported type. func toFloat64(v any) float64 { switch t := v.(type) { case float64: return t case float32: return float64(t) case int64: return float64(t) case uint64: return float64(t) } return 0 } // toInt64 converts DuckDB result values to int64. Returns 0 for unsupported types. func toInt64(v any) int64 { switch t := v.(type) { case int64: return t case uint64: return int64(t) case float64: return int64(t) } return 0 } // calcStdDev returns the sample standard deviation (Bessel-corrected) of data. // Returns 0 when fewer than two data points are provided. func calcStdDev(data []float64) float64 { n := float64(len(data)) if n < 2 { return 0.0 } var sum float64 for _, v := range data { sum += v } mean := sum / n var variance float64 for _, v := range data { d := v - mean variance += d * d } return math.Sqrt(variance / (n - 1)) }