230 lines
10 KiB
Go
230 lines
10 KiB
Go
// Package transform contains the DuckDB-backed Tumbling Window Engine.
|
|
package transform
|
|
|
|
import (
|
|
"fmt"
|
|
"strings"
|
|
|
|
"codeberg.org/pata1704/guenther/internal/config"
|
|
)
|
|
|
|
// they are derived from already-scaled inputs or are ratio/delta features).
|
|
var scalerFeatureNames = []string{
|
|
// CPU (3)
|
|
"avg_cpu", "max_cpu", "std_cpu",
|
|
// System/Kernel (7)
|
|
"avg_iowait", "std_iowait", "avg_softirq", "avg_ctx_switches", "avg_interrupts", "avg_softnet_dropped", "avg_softnet_squeeze",
|
|
// Network (8)
|
|
"avg_net_in", "std_net_in", "avg_net_out", "std_net_out", "sum_tcp_retrans", "sum_tcp_fast_retrans", "sum_tcp_timeouts", "avg_net_drops",
|
|
// Disk (4)
|
|
"avg_disk_read", "avg_disk_write", "avg_disk_io_ticks", "std_disk_io_ticks",
|
|
// Log (2)
|
|
"error_count", "severity_score",
|
|
}
|
|
|
|
// ScalerFeatureNames returns the ordered list of feature names stored in
|
|
// scaler_params.
|
|
func ScalerFeatureNames() []string { return scalerFeatureNames }
|
|
|
|
func BuildScalerParamsTable() string {
|
|
return `CREATE TABLE IF NOT EXISTS scaler_params (
|
|
feature_name VARCHAR PRIMARY KEY,
|
|
mean DOUBLE NOT NULL,
|
|
std DOUBLE NOT NULL
|
|
)`
|
|
}
|
|
|
|
func BuildFitScalerQuery() string {
|
|
return `
|
|
INSERT OR REPLACE INTO scaler_params (feature_name, mean, std)
|
|
WITH stats AS (
|
|
SELECT
|
|
-- CPU
|
|
PERCENTILE_CONT(0.5) WITHIN GROUP (ORDER BY cpu_percent) AS m_avg_cpu,
|
|
PERCENTILE_CONT(0.5) WITHIN GROUP (ORDER BY cpu_percent) AS m_max_cpu, -- Approximation
|
|
0.0 AS m_std_cpu, -- Baseline std is often 0 or low
|
|
-- System
|
|
PERCENTILE_CONT(0.5) WITHIN GROUP (ORDER BY cpu_iowait_percent) AS m_avg_iowait,
|
|
0.0 AS m_std_iowait,
|
|
PERCENTILE_CONT(0.5) WITHIN GROUP (ORDER BY cpu_softirq_percent) AS m_avg_softirq,
|
|
PERCENTILE_CONT(0.5) WITHIN GROUP (ORDER BY context_switches_s) AS m_avg_ctx_switches,
|
|
PERCENTILE_CONT(0.5) WITHIN GROUP (ORDER BY interrupts_s) AS m_avg_interrupts,
|
|
PERCENTILE_CONT(0.5) WITHIN GROUP (ORDER BY softnet_dropped_s) AS m_avg_softnet_dropped,
|
|
PERCENTILE_CONT(0.5) WITHIN GROUP (ORDER BY softnet_time_squeeze_s) AS m_avg_softnet_squeeze,
|
|
-- Network
|
|
PERCENTILE_CONT(0.5) WITHIN GROUP (ORDER BY net_in_mbps) AS m_avg_net_in,
|
|
0.0 AS m_std_net_in,
|
|
PERCENTILE_CONT(0.5) WITHIN GROUP (ORDER BY net_out_mbps) AS m_avg_net_out,
|
|
0.0 AS m_std_net_out,
|
|
PERCENTILE_CONT(0.5) WITHIN GROUP (ORDER BY tcp_retrans_s) AS m_sum_tcp_retrans,
|
|
PERCENTILE_CONT(0.5) WITHIN GROUP (ORDER BY tcp_fast_retrans_s) AS m_sum_tcp_fast_retrans,
|
|
PERCENTILE_CONT(0.5) WITHIN GROUP (ORDER BY tcp_timeouts_s) AS m_sum_tcp_timeouts,
|
|
PERCENTILE_CONT(0.5) WITHIN GROUP (ORDER BY network_drops_s) AS m_avg_net_drops,
|
|
-- Disk
|
|
PERCENTILE_CONT(0.5) WITHIN GROUP (ORDER BY disk_read_mbps) AS m_avg_disk_read,
|
|
PERCENTILE_CONT(0.5) WITHIN GROUP (ORDER BY disk_write_mbps) AS m_avg_disk_write,
|
|
PERCENTILE_CONT(0.5) WITHIN GROUP (ORDER BY disk_io_ticks_s) AS m_avg_disk_io_ticks,
|
|
0.0 AS m_std_disk_io_ticks,
|
|
|
|
-- IQRs for scaling
|
|
(PERCENTILE_CONT(0.75) WITHIN GROUP (ORDER BY cpu_percent) - PERCENTILE_CONT(0.25) WITHIN GROUP (ORDER BY cpu_percent)) AS s_avg_cpu,
|
|
(PERCENTILE_CONT(0.75) WITHIN GROUP (ORDER BY cpu_iowait_percent) - PERCENTILE_CONT(0.25) WITHIN GROUP (ORDER BY cpu_iowait_percent)) AS s_avg_iowait,
|
|
(PERCENTILE_CONT(0.75) WITHIN GROUP (ORDER BY net_in_mbps) - PERCENTILE_CONT(0.25) WITHIN GROUP (ORDER BY net_in_mbps)) AS s_avg_net_in,
|
|
(PERCENTILE_CONT(0.75) WITHIN GROUP (ORDER BY net_out_mbps) - PERCENTILE_CONT(0.25) WITHIN GROUP (ORDER BY net_out_mbps)) AS s_avg_net_out,
|
|
(PERCENTILE_CONT(0.75) WITHIN GROUP (ORDER BY disk_io_ticks_s) - PERCENTILE_CONT(0.25) WITHIN GROUP (ORDER BY disk_io_ticks_s)) AS s_avg_disk_io_ticks
|
|
FROM raw_metrics
|
|
WHERE timestamp >= $1 AND timestamp < $2
|
|
)
|
|
SELECT feature_name, mean, std FROM (
|
|
SELECT 'avg_cpu' AS feature_name, s.m_avg_cpu AS mean, GREATEST(s.s_avg_cpu, 1e-9) AS std FROM stats s UNION ALL
|
|
SELECT 'max_cpu', s.m_max_cpu, GREATEST(s.s_avg_cpu, 1e-9) FROM stats s UNION ALL
|
|
SELECT 'std_cpu', 0.0, 1.0 FROM stats s UNION ALL
|
|
SELECT 'avg_iowait', s.m_avg_iowait, GREATEST(s.s_avg_iowait, 1e-9) FROM stats s UNION ALL
|
|
SELECT 'std_iowait', 0.0, 1.0 FROM stats s UNION ALL
|
|
SELECT 'avg_softirq', s.m_avg_softirq, 1.0 FROM stats s UNION ALL
|
|
SELECT 'avg_ctx_switches', s.m_avg_ctx_switches, 1.0 FROM stats s UNION ALL
|
|
SELECT 'avg_interrupts', s.m_avg_interrupts, 1.0 FROM stats s UNION ALL
|
|
SELECT 'avg_softnet_dropped', s.m_avg_softnet_dropped, 1.0 FROM stats s UNION ALL
|
|
SELECT 'avg_softnet_squeeze', s.m_avg_softnet_squeeze, 1.0 FROM stats s UNION ALL
|
|
SELECT 'avg_net_in', s.m_avg_net_in, GREATEST(s.s_avg_net_in, 1e-9) FROM stats s UNION ALL
|
|
SELECT 'std_net_in', 0.0, 1.0 FROM stats s UNION ALL
|
|
SELECT 'avg_net_out', s.m_avg_net_out, GREATEST(s.s_avg_net_out, 1e-9) FROM stats s UNION ALL
|
|
SELECT 'std_net_out', 0.0, 1.0 FROM stats s UNION ALL
|
|
SELECT 'sum_tcp_retrans', s.m_sum_tcp_retrans, 1.0 FROM stats s UNION ALL
|
|
SELECT 'sum_tcp_fast_retrans', s.m_sum_tcp_fast_retrans, 1.0 FROM stats s UNION ALL
|
|
SELECT 'sum_tcp_timeouts', s.m_sum_tcp_timeouts, 1.0 FROM stats s UNION ALL
|
|
SELECT 'avg_net_drops', s.m_avg_net_drops, 1.0 FROM stats s UNION ALL
|
|
SELECT 'avg_disk_read', s.m_avg_disk_read, 1.0 FROM stats s UNION ALL
|
|
SELECT 'avg_disk_write', s.m_avg_disk_write, 1.0 FROM stats s UNION ALL
|
|
SELECT 'avg_disk_io_ticks', s.m_avg_disk_io_ticks, GREATEST(s.s_avg_disk_io_ticks, 1e-9) FROM stats s UNION ALL
|
|
SELECT 'std_disk_io_ticks', 0.0, 1.0 FROM stats s UNION ALL
|
|
SELECT 'error_count', 0.0, 1.0 UNION ALL
|
|
SELECT 'severity_score', 0.0, 1.0
|
|
) t`
|
|
}
|
|
|
|
func BuildFusionQuery(maskingPatterns []config.MaskingPattern, systemctlServices []string, windowInterval string) string {
|
|
numericCols := collectNumericCols(maskingPatterns)
|
|
paramCTE := ""
|
|
paramSelect := ""
|
|
paramJoin := ""
|
|
if len(numericCols) > 0 {
|
|
var aggs []string
|
|
for _, col := range numericCols {
|
|
aggs = append(aggs, fmt.Sprintf("AVG(%s) AS avg_%s", col, col))
|
|
paramSelect += fmt.Sprintf(", COALESCE(p.avg_%s, 0.0) AS avg_%s", col, col)
|
|
}
|
|
paramCTE = fmt.Sprintf(`, param_agg AS (SELECT time_bucket(INTERVAL '%s', event_time) AS ws, %s FROM log_params GROUP BY 1)`, windowInterval, strings.Join(aggs, ", "))
|
|
paramJoin = "LEFT JOIN param_agg p ON m.ws = p.ws"
|
|
}
|
|
|
|
svcCTE := ""
|
|
svcSelect := ""
|
|
svcJoin := ""
|
|
if len(systemctlServices) > 0 {
|
|
var svcAggs []string
|
|
for _, svc := range systemctlServices {
|
|
safeName := strings.ReplaceAll(strings.ReplaceAll(svc, ".", "_"), "-", "_")
|
|
svcAggs = append(svcAggs, fmt.Sprintf(`MODE(CASE WHEN active_state = 'active' THEN 1 WHEN active_state = 'failed' THEN -1 ELSE 0 END) AS state_%s`, safeName))
|
|
svcSelect += fmt.Sprintf(", COALESCE(s.state_%s, 0) AS svc_%s", safeName, safeName)
|
|
}
|
|
svcCTE = fmt.Sprintf(`, svc_agg AS (SELECT time_bucket(INTERVAL '%s', timestamp) AS ws, %s FROM service_status GROUP BY 1)`, windowInterval, strings.Join(svcAggs, ", "))
|
|
svcJoin = "LEFT JOIN svc_agg s ON m.ws = s.ws"
|
|
}
|
|
|
|
var scFields []string
|
|
for _, name := range scalerFeatureNames {
|
|
scFields = append(scFields, fmt.Sprintf("COALESCE(MAX(CASE WHEN feature_name='%s' THEN mean END),0) AS m_%s, COALESCE(MAX(CASE WHEN feature_name='%s' THEN std END),1) AS s_%s", name, name, name, name))
|
|
}
|
|
|
|
var normVecFields []string
|
|
for _, name := range scalerFeatureNames {
|
|
// DuckDB aggregation aliases match these exactly (see metric_agg and log_agg below)
|
|
src := name
|
|
if name == "severity_score" || name == "error_count" {
|
|
src = "l." + name
|
|
} else {
|
|
src = "m." + name
|
|
}
|
|
normVecFields = append(normVecFields, fmt.Sprintf("(COALESCE(%s, 0.0) - sc.m_%s) / sc.s_%s AS sc_%s", src, name, name, name))
|
|
}
|
|
|
|
return fmt.Sprintf(`
|
|
WITH metric_agg AS (
|
|
SELECT
|
|
time_bucket(INTERVAL '%[1]s', timestamp) AS ws,
|
|
AVG(cpu_percent) AS avg_cpu, MAX(cpu_percent) AS max_cpu, STDDEV_SAMP(cpu_percent) AS std_cpu,
|
|
AVG(cpu_iowait_percent) AS avg_iowait, STDDEV_SAMP(cpu_iowait_percent) AS std_iowait,
|
|
AVG(cpu_softirq_percent) AS avg_softirq, AVG(context_switches_s) AS avg_ctx_switches,
|
|
AVG(interrupts_s) AS avg_interrupts, AVG(softnet_dropped_s) AS avg_softnet_dropped,
|
|
AVG(softnet_time_squeeze_s) AS avg_softnet_squeeze,
|
|
AVG(memory_used_mb) AS avg_mem_used, AVG(memory_cached_mb) AS avg_mem_cached, MAX(memory_dirty_mb) AS max_mem_dirty,
|
|
AVG(net_in_mbps) AS avg_net_in, STDDEV_SAMP(net_in_mbps) AS std_net_in,
|
|
AVG(net_out_mbps) AS avg_net_out, STDDEV_SAMP(net_out_mbps) AS std_net_out,
|
|
SUM(tcp_retrans_s) AS sum_tcp_retrans, SUM(tcp_fast_retrans_s) AS sum_tcp_fast_retrans,
|
|
SUM(tcp_timeouts_s) AS sum_tcp_timeouts, AVG(network_drops_s) AS avg_net_drops,
|
|
AVG(disk_read_mbps) AS avg_disk_read, AVG(disk_write_mbps) AS avg_disk_write,
|
|
AVG(disk_io_ticks_s) AS avg_disk_io_ticks, STDDEV_SAMP(disk_io_ticks_s) AS std_disk_io_ticks,
|
|
SUM(disk_read_time_s) AS sum_disk_read_time, SUM(disk_write_time_s) AS sum_disk_write_time,
|
|
SUM(disk_reads_s) AS sum_disk_reads, SUM(disk_writes_s) AS sum_disk_writes,
|
|
SUM(net_packets_in_s) AS sum_packets_in, SUM(net_packets_out_s) AS sum_packets_out
|
|
FROM raw_metrics GROUP BY 1
|
|
),
|
|
log_agg AS (
|
|
SELECT
|
|
time_bucket(INTERVAL '%[1]s', timestamp) AS ws,
|
|
COUNT(*) AS log_event_count, COUNT(DISTINCT template_id) AS unique_templates,
|
|
SUM(CASE WHEN severity = 'ERROR' THEN 1 ELSE 0 END) AS error_count,
|
|
SUM(CASE
|
|
WHEN severity = 'ERROR' THEN 10
|
|
WHEN severity = 'WARN' THEN 3
|
|
ELSE 1
|
|
END) AS severity_score
|
|
FROM log_events GROUP BY 1
|
|
)%[2]s%[3]s,
|
|
scaler AS (
|
|
SELECT %[4]s FROM scaler_params
|
|
)
|
|
SELECT m.ws,
|
|
m.*, l.log_event_count, l.unique_templates, l.error_count, l.severity_score%[5]s%[6]s,
|
|
%[7]s
|
|
FROM metric_agg m
|
|
LEFT JOIN log_agg l ON m.ws = l.ws
|
|
%[8]s %[9]s
|
|
CROSS JOIN scaler sc
|
|
ORDER BY m.ws DESC LIMIT 1`,
|
|
windowInterval, paramCTE, svcCTE, strings.Join(scFields, ", "), paramSelect, svcSelect, strings.Join(normVecFields, ", "), paramJoin, svcJoin)
|
|
}
|
|
|
|
func BuildLogParamsSchema(patterns []config.MaskingPattern) string {
|
|
cols := []string{"event_time TIMESTAMP WITH TIME ZONE"}
|
|
for _, mp := range patterns {
|
|
if mp.Name == "" {
|
|
continue
|
|
}
|
|
cols = append(cols, fmt.Sprintf("param_%s %s", mp.Name, sqlType(mp.Type)))
|
|
}
|
|
return fmt.Sprintf("CREATE TABLE IF NOT EXISTS log_params (\n\t%s\n)", strings.Join(cols, ",\n\t"))
|
|
}
|
|
|
|
func sqlType(t string) string {
|
|
switch t {
|
|
case "float":
|
|
return "DOUBLE"
|
|
case "int":
|
|
return "BIGINT"
|
|
default:
|
|
return "VARCHAR"
|
|
}
|
|
}
|
|
|
|
func collectNumericCols(patterns []config.MaskingPattern) []string {
|
|
var cols []string
|
|
for _, mp := range patterns {
|
|
if mp.Name == "" || mp.Type == "string" {
|
|
continue
|
|
}
|
|
cols = append(cols, "param_"+mp.Name)
|
|
}
|
|
return cols
|
|
}
|