// Package transform contains the DuckDB-backed Tumbling Window Engine. package transform import ( "fmt" "strings" "codeberg.org/pata1704/guenther/internal/config" ) // they are derived from already-scaled inputs or are ratio/delta features). var scalerFeatureNames = []string{ // CPU (3) "avg_cpu", "max_cpu", "std_cpu", // System/Kernel (7) "avg_iowait", "std_iowait", "avg_softirq", "avg_ctx_switches", "avg_interrupts", "avg_softnet_dropped", "avg_softnet_squeeze", // Network (8) "avg_net_in", "std_net_in", "avg_net_out", "std_net_out", "sum_tcp_retrans", "sum_tcp_fast_retrans", "sum_tcp_timeouts", "avg_net_drops", // Disk (4) "avg_disk_read", "avg_disk_write", "avg_disk_io_ticks", "std_disk_io_ticks", // Log (2) "error_count", "severity_score", } // ScalerFeatureNames returns the ordered list of feature names stored in // scaler_params. func ScalerFeatureNames() []string { return scalerFeatureNames } func BuildScalerParamsTable() string { return `CREATE TABLE IF NOT EXISTS scaler_params ( feature_name VARCHAR PRIMARY KEY, mean DOUBLE NOT NULL, std DOUBLE NOT NULL )` } func BuildFitScalerQuery() string { return ` INSERT OR REPLACE INTO scaler_params (feature_name, mean, std) WITH stats AS ( SELECT -- CPU PERCENTILE_CONT(0.5) WITHIN GROUP (ORDER BY cpu_percent) AS m_avg_cpu, PERCENTILE_CONT(0.5) WITHIN GROUP (ORDER BY cpu_percent) AS m_max_cpu, -- Approximation 0.0 AS m_std_cpu, -- Baseline std is often 0 or low -- System PERCENTILE_CONT(0.5) WITHIN GROUP (ORDER BY cpu_iowait_percent) AS m_avg_iowait, 0.0 AS m_std_iowait, PERCENTILE_CONT(0.5) WITHIN GROUP (ORDER BY cpu_softirq_percent) AS m_avg_softirq, PERCENTILE_CONT(0.5) WITHIN GROUP (ORDER BY context_switches_s) AS m_avg_ctx_switches, PERCENTILE_CONT(0.5) WITHIN GROUP (ORDER BY interrupts_s) AS m_avg_interrupts, PERCENTILE_CONT(0.5) WITHIN GROUP (ORDER BY softnet_dropped_s) AS m_avg_softnet_dropped, PERCENTILE_CONT(0.5) WITHIN GROUP (ORDER BY softnet_time_squeeze_s) AS m_avg_softnet_squeeze, -- Network PERCENTILE_CONT(0.5) WITHIN GROUP (ORDER BY net_in_mbps) AS m_avg_net_in, 0.0 AS m_std_net_in, PERCENTILE_CONT(0.5) WITHIN GROUP (ORDER BY net_out_mbps) AS m_avg_net_out, 0.0 AS m_std_net_out, PERCENTILE_CONT(0.5) WITHIN GROUP (ORDER BY tcp_retrans_s) AS m_sum_tcp_retrans, PERCENTILE_CONT(0.5) WITHIN GROUP (ORDER BY tcp_fast_retrans_s) AS m_sum_tcp_fast_retrans, PERCENTILE_CONT(0.5) WITHIN GROUP (ORDER BY tcp_timeouts_s) AS m_sum_tcp_timeouts, PERCENTILE_CONT(0.5) WITHIN GROUP (ORDER BY network_drops_s) AS m_avg_net_drops, -- Disk PERCENTILE_CONT(0.5) WITHIN GROUP (ORDER BY disk_read_mbps) AS m_avg_disk_read, PERCENTILE_CONT(0.5) WITHIN GROUP (ORDER BY disk_write_mbps) AS m_avg_disk_write, PERCENTILE_CONT(0.5) WITHIN GROUP (ORDER BY disk_io_ticks_s) AS m_avg_disk_io_ticks, 0.0 AS m_std_disk_io_ticks, -- IQRs for scaling (PERCENTILE_CONT(0.75) WITHIN GROUP (ORDER BY cpu_percent) - PERCENTILE_CONT(0.25) WITHIN GROUP (ORDER BY cpu_percent)) AS s_avg_cpu, (PERCENTILE_CONT(0.75) WITHIN GROUP (ORDER BY cpu_iowait_percent) - PERCENTILE_CONT(0.25) WITHIN GROUP (ORDER BY cpu_iowait_percent)) AS s_avg_iowait, (PERCENTILE_CONT(0.75) WITHIN GROUP (ORDER BY net_in_mbps) - PERCENTILE_CONT(0.25) WITHIN GROUP (ORDER BY net_in_mbps)) AS s_avg_net_in, (PERCENTILE_CONT(0.75) WITHIN GROUP (ORDER BY net_out_mbps) - PERCENTILE_CONT(0.25) WITHIN GROUP (ORDER BY net_out_mbps)) AS s_avg_net_out, (PERCENTILE_CONT(0.75) WITHIN GROUP (ORDER BY disk_io_ticks_s) - PERCENTILE_CONT(0.25) WITHIN GROUP (ORDER BY disk_io_ticks_s)) AS s_avg_disk_io_ticks FROM raw_metrics WHERE timestamp >= $1 AND timestamp < $2 ) SELECT feature_name, mean, std FROM ( SELECT 'avg_cpu' AS feature_name, s.m_avg_cpu AS mean, GREATEST(s.s_avg_cpu, 1e-9) AS std FROM stats s UNION ALL SELECT 'max_cpu', s.m_max_cpu, GREATEST(s.s_avg_cpu, 1e-9) FROM stats s UNION ALL SELECT 'std_cpu', 0.0, 1.0 FROM stats s UNION ALL SELECT 'avg_iowait', s.m_avg_iowait, GREATEST(s.s_avg_iowait, 1e-9) FROM stats s UNION ALL SELECT 'std_iowait', 0.0, 1.0 FROM stats s UNION ALL SELECT 'avg_softirq', s.m_avg_softirq, 1.0 FROM stats s UNION ALL SELECT 'avg_ctx_switches', s.m_avg_ctx_switches, 1.0 FROM stats s UNION ALL SELECT 'avg_interrupts', s.m_avg_interrupts, 1.0 FROM stats s UNION ALL SELECT 'avg_softnet_dropped', s.m_avg_softnet_dropped, 1.0 FROM stats s UNION ALL SELECT 'avg_softnet_squeeze', s.m_avg_softnet_squeeze, 1.0 FROM stats s UNION ALL SELECT 'avg_net_in', s.m_avg_net_in, GREATEST(s.s_avg_net_in, 1e-9) FROM stats s UNION ALL SELECT 'std_net_in', 0.0, 1.0 FROM stats s UNION ALL SELECT 'avg_net_out', s.m_avg_net_out, GREATEST(s.s_avg_net_out, 1e-9) FROM stats s UNION ALL SELECT 'std_net_out', 0.0, 1.0 FROM stats s UNION ALL SELECT 'sum_tcp_retrans', s.m_sum_tcp_retrans, 1.0 FROM stats s UNION ALL SELECT 'sum_tcp_fast_retrans', s.m_sum_tcp_fast_retrans, 1.0 FROM stats s UNION ALL SELECT 'sum_tcp_timeouts', s.m_sum_tcp_timeouts, 1.0 FROM stats s UNION ALL SELECT 'avg_net_drops', s.m_avg_net_drops, 1.0 FROM stats s UNION ALL SELECT 'avg_disk_read', s.m_avg_disk_read, 1.0 FROM stats s UNION ALL SELECT 'avg_disk_write', s.m_avg_disk_write, 1.0 FROM stats s UNION ALL SELECT 'avg_disk_io_ticks', s.m_avg_disk_io_ticks, GREATEST(s.s_avg_disk_io_ticks, 1e-9) FROM stats s UNION ALL SELECT 'std_disk_io_ticks', 0.0, 1.0 FROM stats s UNION ALL SELECT 'error_count', 0.0, 1.0 UNION ALL SELECT 'severity_score', 0.0, 1.0 ) t` } func BuildFusionQuery(maskingPatterns []config.MaskingPattern, systemctlServices []string, windowInterval string) string { numericCols := collectNumericCols(maskingPatterns) paramCTE := "" paramSelect := "" paramJoin := "" if len(numericCols) > 0 { var aggs []string for _, col := range numericCols { aggs = append(aggs, fmt.Sprintf("AVG(%s) AS avg_%s", col, col)) paramSelect += fmt.Sprintf(", COALESCE(p.avg_%s, 0.0) AS avg_%s", col, col) } paramCTE = fmt.Sprintf(`, param_agg AS (SELECT time_bucket(INTERVAL '%s', event_time) AS ws, %s FROM log_params GROUP BY 1)`, windowInterval, strings.Join(aggs, ", ")) paramJoin = "LEFT JOIN param_agg p ON m.ws = p.ws" } svcCTE := "" svcSelect := "" svcJoin := "" if len(systemctlServices) > 0 { var svcAggs []string for _, svc := range systemctlServices { safeName := strings.ReplaceAll(strings.ReplaceAll(svc, ".", "_"), "-", "_") svcAggs = append(svcAggs, fmt.Sprintf(`MODE(CASE WHEN active_state = 'active' THEN 1 WHEN active_state = 'failed' THEN -1 ELSE 0 END) AS state_%s`, safeName)) svcSelect += fmt.Sprintf(", COALESCE(s.state_%s, 0) AS svc_%s", safeName, safeName) } svcCTE = fmt.Sprintf(`, svc_agg AS (SELECT time_bucket(INTERVAL '%s', timestamp) AS ws, %s FROM service_status GROUP BY 1)`, windowInterval, strings.Join(svcAggs, ", ")) svcJoin = "LEFT JOIN svc_agg s ON m.ws = s.ws" } var scFields []string for _, name := range scalerFeatureNames { scFields = append(scFields, fmt.Sprintf("COALESCE(MAX(CASE WHEN feature_name='%s' THEN mean END),0) AS m_%s, COALESCE(MAX(CASE WHEN feature_name='%s' THEN std END),1) AS s_%s", name, name, name, name)) } var normVecFields []string for _, name := range scalerFeatureNames { // DuckDB aggregation aliases match these exactly (see metric_agg and log_agg below) src := name if name == "severity_score" || name == "error_count" { src = "l." + name } else { src = "m." + name } normVecFields = append(normVecFields, fmt.Sprintf("(COALESCE(%s, 0.0) - sc.m_%s) / sc.s_%s AS sc_%s", src, name, name, name)) } return fmt.Sprintf(` WITH metric_agg AS ( SELECT time_bucket(INTERVAL '%[1]s', timestamp) AS ws, AVG(cpu_percent) AS avg_cpu, MAX(cpu_percent) AS max_cpu, STDDEV_SAMP(cpu_percent) AS std_cpu, AVG(cpu_iowait_percent) AS avg_iowait, STDDEV_SAMP(cpu_iowait_percent) AS std_iowait, AVG(cpu_softirq_percent) AS avg_softirq, AVG(context_switches_s) AS avg_ctx_switches, AVG(interrupts_s) AS avg_interrupts, AVG(softnet_dropped_s) AS avg_softnet_dropped, AVG(softnet_time_squeeze_s) AS avg_softnet_squeeze, AVG(memory_used_mb) AS avg_mem_used, AVG(memory_cached_mb) AS avg_mem_cached, MAX(memory_dirty_mb) AS max_mem_dirty, AVG(net_in_mbps) AS avg_net_in, STDDEV_SAMP(net_in_mbps) AS std_net_in, AVG(net_out_mbps) AS avg_net_out, STDDEV_SAMP(net_out_mbps) AS std_net_out, SUM(tcp_retrans_s) AS sum_tcp_retrans, SUM(tcp_fast_retrans_s) AS sum_tcp_fast_retrans, SUM(tcp_timeouts_s) AS sum_tcp_timeouts, AVG(network_drops_s) AS avg_net_drops, AVG(disk_read_mbps) AS avg_disk_read, AVG(disk_write_mbps) AS avg_disk_write, AVG(disk_io_ticks_s) AS avg_disk_io_ticks, STDDEV_SAMP(disk_io_ticks_s) AS std_disk_io_ticks, SUM(disk_read_time_s) AS sum_disk_read_time, SUM(disk_write_time_s) AS sum_disk_write_time, SUM(disk_reads_s) AS sum_disk_reads, SUM(disk_writes_s) AS sum_disk_writes, SUM(net_packets_in_s) AS sum_packets_in, SUM(net_packets_out_s) AS sum_packets_out FROM raw_metrics GROUP BY 1 ), log_agg AS ( SELECT time_bucket(INTERVAL '%[1]s', timestamp) AS ws, COUNT(*) AS log_event_count, COUNT(DISTINCT template_id) AS unique_templates, SUM(CASE WHEN severity = 'ERROR' THEN 1 ELSE 0 END) AS error_count, SUM(CASE WHEN severity = 'ERROR' THEN 10 WHEN severity = 'WARN' THEN 3 ELSE 1 END) AS severity_score FROM log_events GROUP BY 1 )%[2]s%[3]s, scaler AS ( SELECT %[4]s FROM scaler_params ) SELECT m.ws, m.*, l.log_event_count, l.unique_templates, l.error_count, l.severity_score%[5]s%[6]s, %[7]s FROM metric_agg m LEFT JOIN log_agg l ON m.ws = l.ws %[8]s %[9]s CROSS JOIN scaler sc ORDER BY m.ws DESC LIMIT 1`, windowInterval, paramCTE, svcCTE, strings.Join(scFields, ", "), paramSelect, svcSelect, strings.Join(normVecFields, ", "), paramJoin, svcJoin) } func BuildLogParamsSchema(patterns []config.MaskingPattern) string { cols := []string{"event_time TIMESTAMP WITH TIME ZONE"} for _, mp := range patterns { if mp.Name == "" { continue } cols = append(cols, fmt.Sprintf("param_%s %s", mp.Name, sqlType(mp.Type))) } return fmt.Sprintf("CREATE TABLE IF NOT EXISTS log_params (\n\t%s\n)", strings.Join(cols, ",\n\t")) } func sqlType(t string) string { switch t { case "float": return "DOUBLE" case "int": return "BIGINT" default: return "VARCHAR" } } func collectNumericCols(patterns []config.MaskingPattern) []string { var cols []string for _, mp := range patterns { if mp.Name == "" || mp.Type == "string" { continue } cols = append(cols, "param_"+mp.Name) } return cols }