1091 lines
35 KiB
Go
1091 lines
35 KiB
Go
// Package transform contains the DuckDB-backed Tumbling Window Engine.
|
||
package transform
|
||
|
||
import (
|
||
"context"
|
||
"database/sql"
|
||
"encoding/json"
|
||
"fmt"
|
||
"log"
|
||
"math"
|
||
"strconv"
|
||
"strings"
|
||
"sync"
|
||
"time"
|
||
|
||
"codeberg.org/pata1704/guenther/internal/config"
|
||
"codeberg.org/pata1704/guenther/pkg/types"
|
||
"github.com/apache/arrow-go/v18/arrow"
|
||
"github.com/apache/arrow-go/v18/arrow/array"
|
||
"github.com/apache/arrow-go/v18/arrow/memory"
|
||
"github.com/duckdb/duckdb-go/v2"
|
||
)
|
||
|
||
// TransformEngine implements the DuckDB Tumbling-Window fusion layer.
|
||
//
|
||
// Architecture:
|
||
// - raw_metrics (bronze layer): every MetricSnapshot field, 1 Hz rows
|
||
// - log_events: parsed log lines with severity and parameters JSON
|
||
// - log_params: numeric masking-pattern values as typed columns
|
||
// - features: one row per window; feature_json for offline Python EDA
|
||
//
|
||
// Feature vector construction per window:
|
||
// 1. DuckDB fusion query aggregates raw_metrics + log_events into window-level
|
||
// raw aggregates (avg_cpu, sum_tcp_retrans, …) and sc_* columns (DuckDB
|
||
// RobustScaler: (x-median)/IQR, fitted on baseline via FitScaler).
|
||
// 2. processWindow computes engineered features (CPUDelta, NetDelta,
|
||
// CPURollStd, CPUEfficiency, IOWaitProxy, LogDensity) in-process.
|
||
// 3. NormalizedVector layout:
|
||
// [0..22] sc_* DuckDB-scaled raw aggregates
|
||
// [23..28] engineered features (appended unscaled; ratio/delta scale)
|
||
// [29+] Drain param averages (if numeric patterns configured)
|
||
//
|
||
// Any change to NormalizedVector layout requires retraining all detectors.
|
||
type TransformEngine struct {
|
||
cfg *config.Config
|
||
db *sql.DB
|
||
conn *duckdb.Connector // kept for the Arrow zero-copy path
|
||
|
||
logInputChan <-chan types.LogEvent
|
||
metricInputChan <-chan types.MetricSnapshot
|
||
serviceStatusInputChan <-chan types.ServiceStatus
|
||
outputChan chan<- types.FeatureVector
|
||
healthChan chan<- types.StageHealth
|
||
|
||
windowSize time.Duration
|
||
wg sync.WaitGroup
|
||
|
||
mu sync.Mutex
|
||
logBatch []types.LogEvent
|
||
metricBatch []types.MetricSnapshot
|
||
serviceStatusBatch []types.ServiceStatus
|
||
|
||
// lastWindowStart prevents duplicate windows from being emitted when the
|
||
// fusion query returns the same bucket twice within one ticker period.
|
||
lastWindowStart time.Time
|
||
|
||
pool memory.Allocator // Arrow allocator, reused across windows
|
||
|
||
fusionQuery string
|
||
paramNames []string
|
||
featureInsertSQL string
|
||
|
||
patternTypes map[string]string
|
||
|
||
// Stateful buffers for feature engineering.
|
||
// Only accessed from the single engine goroutine – no lock needed.
|
||
cpuHistory []float64 // rolling window of avg_cpu, capacity 12
|
||
lastCPU float64 // avg_cpu from previous window
|
||
lastNetOut float64 // avg_net_out from previous window
|
||
lastNetIn float64 // avg_net_in from previous window (for DeltaNetIn)
|
||
lastTCPRetrans float64 // sum_tcp_retrans from previous window (for DeltaTCPRetrans)
|
||
lastCtxSwitches float64 // avg_ctx_switches from previous window (for DeltaCtx)
|
||
tcpRetransHist []float64 // rolling window of sum_tcp_retrans, capacity 5
|
||
netOutHistory []float64 // rolling window of avg_net_out, capacity 5
|
||
|
||
// scalerFitted is set to true after FitScaler() has successfully written
|
||
// parameters to scaler_params. When false the DuckDB scaler CTE falls back
|
||
// to median=0/iqr=1 (identity transform) so the engine functions before
|
||
// the baseline period has been recorded.
|
||
scalerFitted bool
|
||
|
||
processed uint64 // monotone window counter for health reporting
|
||
avgLatency float64 // EWMA latency in ms for health reporting
|
||
}
|
||
|
||
// NewTransformEngine opens DuckDB, creates all tables, and pre-compiles queries.
|
||
func NewTransformEngine(
|
||
cfg *config.Config,
|
||
logInput <-chan types.LogEvent,
|
||
metricInput <-chan types.MetricSnapshot,
|
||
serviceStatusInput <-chan types.ServiceStatus,
|
||
output chan<- types.FeatureVector,
|
||
health chan<- types.StageHealth,
|
||
) (*TransformEngine, error) {
|
||
conn, err := duckdb.NewConnector(cfg.Transformation.DbPath, nil)
|
||
if err != nil {
|
||
return nil, fmt.Errorf("transform: open duckdb %q: %w", cfg.Transformation.DbPath, err)
|
||
}
|
||
db := sql.OpenDB(conn)
|
||
|
||
if err := createSchema(db, cfg); err != nil {
|
||
db.Close()
|
||
return nil, err
|
||
}
|
||
|
||
windowInterval := fmt.Sprintf("%d seconds", int(cfg.Transformation.WindowSize.Seconds()))
|
||
fusionQuery := BuildFusionQuery(cfg.Drain.MaskingPatterns, cfg.Ingestion.SystemctlServices, windowInterval)
|
||
|
||
var paramNames []string
|
||
patternTypes := make(map[string]string, len(cfg.Drain.MaskingPatterns))
|
||
for _, mp := range cfg.Drain.MaskingPatterns {
|
||
if mp.Name != "" {
|
||
paramNames = append(paramNames, mp.Name)
|
||
patternTypes[mp.Name] = mp.Type
|
||
}
|
||
}
|
||
|
||
return &TransformEngine{
|
||
cfg: cfg,
|
||
db: db,
|
||
conn: conn,
|
||
logInputChan: logInput,
|
||
metricInputChan: metricInput,
|
||
serviceStatusInputChan: serviceStatusInput,
|
||
outputChan: output,
|
||
healthChan: health,
|
||
windowSize: cfg.Transformation.WindowSize,
|
||
pool: memory.NewGoAllocator(),
|
||
fusionQuery: fusionQuery,
|
||
paramNames: paramNames,
|
||
patternTypes: patternTypes,
|
||
featureInsertSQL: `
|
||
INSERT INTO features (window_start, feature_json)
|
||
VALUES (?, ?)
|
||
ON CONFLICT (window_start) DO UPDATE
|
||
SET feature_json = excluded.feature_json`,
|
||
}, nil
|
||
}
|
||
|
||
func createSchema(db *sql.DB, cfg *config.Config) error {
|
||
stmts := []struct {
|
||
sql string
|
||
desc string
|
||
}{
|
||
{"SET memory_limit = '512MB'", "set memory_limit"},
|
||
{"SET threads = 2", "set threads"},
|
||
{`CREATE TABLE IF NOT EXISTS raw_metrics (
|
||
timestamp TIMESTAMP WITH TIME ZONE,
|
||
cpu_percent DOUBLE,
|
||
cpu_iowait_percent DOUBLE,
|
||
cpu_softirq_percent DOUBLE,
|
||
context_switches_s DOUBLE,
|
||
interrupts_s DOUBLE,
|
||
memory_used_mb DOUBLE,
|
||
memory_cached_mb DOUBLE,
|
||
memory_dirty_mb DOUBLE,
|
||
net_in_mbps DOUBLE,
|
||
net_out_mbps DOUBLE,
|
||
disk_read_mbps DOUBLE,
|
||
disk_write_mbps DOUBLE,
|
||
disk_read_time_s DOUBLE,
|
||
disk_write_time_s DOUBLE,
|
||
disk_io_ticks_s DOUBLE,
|
||
network_errors_s DOUBLE,
|
||
network_drops_s DOUBLE,
|
||
tcp_retrans_s DOUBLE,
|
||
tcp_timeouts_s DOUBLE,
|
||
tcp_lost_retransmit_s DOUBLE,
|
||
tcp_fast_retrans_s DOUBLE,
|
||
softnet_dropped_s DOUBLE,
|
||
softnet_time_squeeze_s DOUBLE,
|
||
net_packets_in_s DOUBLE,
|
||
net_packets_out_s DOUBLE,
|
||
disk_reads_s DOUBLE,
|
||
disk_writes_s DOUBLE
|
||
)`, "create raw_metrics table"},
|
||
{`CREATE TABLE IF NOT EXISTS log_events (
|
||
timestamp TIMESTAMP WITH TIME ZONE,
|
||
template_id INTEGER,
|
||
severity VARCHAR,
|
||
parameters JSON
|
||
)`, "create log_events table"},
|
||
{`CREATE TABLE IF NOT EXISTS service_status (
|
||
timestamp TIMESTAMP WITH TIME ZONE,
|
||
service_name VARCHAR,
|
||
active_state VARCHAR,
|
||
sub_state VARCHAR
|
||
)`, "create service_status table"},
|
||
{BuildLogParamsSchema(cfg.Drain.MaskingPatterns), "create log_params table"},
|
||
// scaler_params: one row per feature; median (Q50) + IQR (Q75-Q25) fitted
|
||
// from baseline data via FitScaler(). Queried inline by BuildFusionQuery.
|
||
{BuildScalerParamsTable(), "create scaler_params table"},
|
||
// features: one row per window; anomaly_score is back-filled by the
|
||
// detector via UpdateAnomalyScore after the detection stage runs.
|
||
{`CREATE TABLE IF NOT EXISTS features (
|
||
window_start TIMESTAMP WITH TIME ZONE PRIMARY KEY,
|
||
window_end TIMESTAMP WITH TIME ZONE,
|
||
anomaly_score DOUBLE,
|
||
feature_json JSON
|
||
)`, "create features table"},
|
||
}
|
||
|
||
for _, s := range stmts {
|
||
if _, err := db.Exec(s.sql); err != nil {
|
||
return fmt.Errorf("transform: %s: %w", s.desc, err)
|
||
}
|
||
}
|
||
return nil
|
||
}
|
||
|
||
// ── lifecycle ─────────────────────────────────────────────────────────────────
|
||
|
||
// Start launches the transform engine goroutine.
|
||
//
|
||
// Design: ingestion (log/metric channel reads) and control (ticker, ctx) are
|
||
// handled in the SAME goroutine to avoid lock contention on the batches.
|
||
// ctx.Done() is checked with priority via a non-blocking pre-select before the
|
||
// blocking select, guaranteeing that the engine exits within one iteration of
|
||
// cancellation regardless of channel pressure.
|
||
func (e *TransformEngine) Start(ctx context.Context) {
|
||
ticker := time.NewTicker(e.windowSize)
|
||
reportTicker := time.NewTicker(5 * time.Second)
|
||
|
||
e.wg.Add(1)
|
||
go func() {
|
||
defer e.wg.Done()
|
||
defer ticker.Stop()
|
||
defer reportTicker.Stop()
|
||
|
||
for {
|
||
// Priority exit: avoids starvation by busy input channels.
|
||
select {
|
||
case <-ctx.Done():
|
||
e.db.Close()
|
||
return
|
||
default:
|
||
}
|
||
|
||
select {
|
||
case ev := <-e.logInputChan:
|
||
e.mu.Lock()
|
||
e.logBatch = append(e.logBatch, ev)
|
||
e.mu.Unlock()
|
||
|
||
case snap := <-e.metricInputChan:
|
||
e.mu.Lock()
|
||
e.metricBatch = append(e.metricBatch, snap)
|
||
e.mu.Unlock()
|
||
|
||
case stat := <-e.serviceStatusInputChan:
|
||
e.mu.Lock()
|
||
e.serviceStatusBatch = append(e.serviceStatusBatch, stat)
|
||
e.mu.Unlock()
|
||
|
||
case <-ticker.C:
|
||
start := time.Now()
|
||
e.processWindow()
|
||
ms := time.Since(start).Seconds() * 1e3
|
||
|
||
e.mu.Lock()
|
||
e.processed++
|
||
if e.avgLatency == 0 {
|
||
e.avgLatency = ms
|
||
} else {
|
||
e.avgLatency = e.avgLatency*0.8 + ms*0.2
|
||
}
|
||
e.mu.Unlock()
|
||
|
||
case <-reportTicker.C:
|
||
e.emitHealth()
|
||
|
||
case <-ctx.Done():
|
||
e.db.Close()
|
||
return
|
||
}
|
||
}
|
||
}()
|
||
}
|
||
|
||
// Wait waits for the transform engine goroutine to exit after context cancellation.
|
||
func (e *TransformEngine) Wait() {
|
||
e.wg.Wait()
|
||
}
|
||
|
||
// FitScaler computes RobustScaler parameters (median Q50 + IQR Q75-Q25) from
|
||
// raw_metrics and log_events rows in the half-open interval [fitStart, fitEnd)
|
||
// and persists them to scaler_params.
|
||
//
|
||
// The RobustScaler uses median + IQR instead of mean + stddev to resist extreme
|
||
// outliers in the baseline that would inflate the mean and push anomaly-phase z-scores
|
||
// toward zero (as a StandardScaler would). This matches
|
||
// sklearn.RobustScaler(quantile_range=(25, 75)).
|
||
//
|
||
// Call once after the baseline period and before the anomaly phase. After this,
|
||
// BuildFusionQuery returns already-scaled sc_* values that populate
|
||
// NormalizedVector slots 0–22.
|
||
//
|
||
// FitScaler is safe to call from any goroutine; it acquires no engine locks
|
||
// because it only writes to DuckDB (which serialises writes internally).
|
||
func (e *TransformEngine) FitScaler(fitStart, fitEnd time.Time) error {
|
||
q := BuildFitScalerQuery()
|
||
if _, err := e.db.Exec(q, fitStart, fitEnd); err != nil {
|
||
return fmt.Errorf("transform: FitScaler [%s, %s): %w",
|
||
fitStart.Format(time.RFC3339), fitEnd.Format(time.RFC3339), err)
|
||
}
|
||
e.mu.Lock()
|
||
e.scalerFitted = true
|
||
e.mu.Unlock()
|
||
log.Printf("transform: FitScaler done – baseline [%s, %s)",
|
||
fitStart.Format("15:04:05Z"), fitEnd.Format("15:04:05Z"))
|
||
return nil
|
||
}
|
||
|
||
// ── window processing ─────────────────────────────────────────────────────────
|
||
|
||
func (e *TransformEngine) processWindow() {
|
||
e.mu.Lock()
|
||
logs := e.logBatch
|
||
metrics := e.metricBatch
|
||
serviceStats := e.serviceStatusBatch
|
||
e.logBatch = nil
|
||
e.metricBatch = nil
|
||
e.serviceStatusBatch = nil
|
||
e.mu.Unlock()
|
||
|
||
if len(metrics) == 0 && len(logs) == 0 && len(serviceStats) == 0 {
|
||
return
|
||
}
|
||
|
||
if err := e.loadToDuckDB(logs, metrics, serviceStats); err != nil {
|
||
log.Printf("transform: load: %v", err)
|
||
return
|
||
}
|
||
|
||
fv, err := e.runFusionQuery()
|
||
if err != nil {
|
||
log.Printf("transform: fusion query: %v", err)
|
||
return
|
||
}
|
||
if fv == nil || !fv.WindowStart.After(e.lastWindowStart) {
|
||
return
|
||
}
|
||
|
||
e.lastWindowStart = fv.WindowStart
|
||
fv.WindowEnd = fv.WindowStart.Add(e.windowSize)
|
||
fv.Timestamp = time.Now()
|
||
|
||
// ── Feature Engineering ───────────────────────────────────────────────────
|
||
// Temporal features capture cross-window dynamics not available from a
|
||
// single-window aggregate. They are appended to NormalizedVector after
|
||
// the DuckDB-scaled raw features.
|
||
//
|
||
// These features are NOT passed through the DuckDB RobustScaler because:
|
||
// (a) CPUDelta / NetDelta are already on a %-point / MBps scale; the
|
||
// detector needs the sign (positive = spike, negative = drop).
|
||
// (b) CPURollStd, CPUEfficiency, IOWaitProxy, LogDensity are ratio/
|
||
// dimensionless features – their natural scale is already bounded.
|
||
// Scaling them would require additional scaler_params rows and a second
|
||
// CTE in BuildFusionQuery for marginal gain;
|
||
fv.CPUDelta = fv.AvgCPUPercent - e.lastCPU
|
||
fv.NetDelta = fv.AvgNetOutMBps - e.lastNetOut
|
||
fv.DeltaCtx = fv.AvgCtxSwitches - e.lastCtxSwitches
|
||
fv.DeltaNetIn = fv.AvgNetInMBps - e.lastNetIn
|
||
fv.DeltaTCPRetrans = fv.SumTCPRetrans - e.lastTCPRetrans
|
||
|
||
e.cpuHistory = append(e.cpuHistory, fv.AvgCPUPercent)
|
||
if len(e.cpuHistory) > 12 {
|
||
e.cpuHistory = e.cpuHistory[1:]
|
||
}
|
||
fv.CPURollStd = calcStdDev(e.cpuHistory)
|
||
|
||
fv.CPUEfficiency = fv.AvgCPUPercent / (fv.AvgNetThroughput + 1.0)
|
||
fv.CPUPerMB = fv.AvgCPUPercent / (fv.AvgNetThroughput + 1e-3)
|
||
fv.NetworkDiskRatio = fv.AvgNetThroughput / (fv.AvgDiskReadMBps + fv.AvgDiskWriteMBps + 1e-3)
|
||
fv.RetransPerMB = fv.SumTCPRetrans / (fv.AvgNetThroughput + 1e-3)
|
||
fv.IOWaitProxy = fv.AvgDiskIOTicks / (fv.AvgCPUPercent + 1.0)
|
||
fv.LogDensity = float64(fv.UniqueTemplates) / (float64(fv.LogCountTotal) + 1.0)
|
||
|
||
e.tcpRetransHist = append(e.tcpRetransHist, fv.SumTCPRetrans)
|
||
if len(e.tcpRetransHist) > 5 {
|
||
e.tcpRetransHist = e.tcpRetransHist[1:]
|
||
}
|
||
fv.TcpRollStd = calcStdDev(e.tcpRetransHist)
|
||
|
||
e.netOutHistory = append(e.netOutHistory, fv.AvgNetOutMBps)
|
||
if len(e.netOutHistory) > 5 {
|
||
e.netOutHistory = e.netOutHistory[1:]
|
||
}
|
||
fv.NetRollStd = calcStdDev(e.netOutHistory)
|
||
|
||
fv.MemPressure = fv.MaxMemDirtyMB / (fv.AvgMemUsedMB + 1.0)
|
||
fv.NetAsymmetry = fv.AvgNetInMBps / (fv.AvgNetOutMBps + 1e-3)
|
||
|
||
e.lastCPU = fv.AvgCPUPercent
|
||
e.lastNetOut = fv.AvgNetOutMBps
|
||
e.lastNetIn = fv.AvgNetInMBps
|
||
e.lastTCPRetrans = fv.SumTCPRetrans
|
||
e.lastCtxSwitches = fv.AvgCtxSwitches
|
||
|
||
// ── Final NormalizedVector Assembly ──────────────────────────────────────
|
||
// sc_* base features (from DuckDB) are already at fv.NormalizedVector[0:len(scalerFeatureNames)]
|
||
fv.NormalizedVector = append(fv.NormalizedVector,
|
||
fv.CPUDelta,
|
||
fv.CPURollStd,
|
||
fv.CPUEfficiency,
|
||
fv.DeltaCtx,
|
||
fv.NetDelta,
|
||
fv.AvgNetThroughput,
|
||
fv.CPUPerMB,
|
||
fv.NetworkDiskRatio,
|
||
fv.RetransPerPacket,
|
||
fv.RetransPerMB,
|
||
fv.AvgDiskLatencyMS,
|
||
float64(fv.LogCountTotal),
|
||
float64(fv.UniqueTemplates),
|
||
fv.LogDensity,
|
||
fv.IOWaitProxy,
|
||
fv.DeltaNetIn,
|
||
fv.DeltaTCPRetrans,
|
||
fv.TcpRollStd,
|
||
fv.NetRollStd,
|
||
fv.MemPressure,
|
||
fv.NetAsymmetry,
|
||
)
|
||
|
||
// Append service statuses (slots 35 to 35+N-1).
|
||
// These are already encoded as 1/0/-1 and don't need RobustScaler.
|
||
for _, svc := range e.cfg.Ingestion.SystemctlServices {
|
||
safeName := strings.ReplaceAll(strings.ReplaceAll(svc, ".", "_"), "-", "_")
|
||
val := fv.ServiceStatuses[safeName]
|
||
fv.NormalizedVector = append(fv.NormalizedVector, val)
|
||
}
|
||
|
||
// Append param averages (slots 35+N onwards) for any numeric masking patterns.
|
||
for _, name := range e.cfg.NumericPatternNames() {
|
||
fv.NormalizedVector = append(fv.NormalizedVector, fv.ParamAvg[name])
|
||
}
|
||
|
||
if err := e.persistFeatureVector(fv); err != nil {
|
||
log.Printf("transform: persist feature vector: %v", err)
|
||
}
|
||
|
||
select {
|
||
case e.outputChan <- *fv:
|
||
default:
|
||
log.Printf("transform: output channel full – dropping feature vector for window %s",
|
||
fv.WindowStart.Format(time.RFC3339))
|
||
}
|
||
}
|
||
|
||
func (e *TransformEngine) loadToDuckDB(logs []types.LogEvent, metrics []types.MetricSnapshot, serviceStats []types.ServiceStatus) error {
|
||
if len(metrics) > 0 {
|
||
if err := e.loadMetricsArrow(metrics); err != nil {
|
||
return fmt.Errorf("metrics: %w", err)
|
||
}
|
||
}
|
||
if len(logs) > 0 {
|
||
if err := e.loadLogs(logs); err != nil {
|
||
return fmt.Errorf("logs: %w", err)
|
||
}
|
||
}
|
||
if len(serviceStats) > 0 {
|
||
if err := e.loadServiceStatus(serviceStats); err != nil {
|
||
return fmt.Errorf("service status: %w", err)
|
||
}
|
||
}
|
||
return nil
|
||
}
|
||
|
||
func (e *TransformEngine) loadServiceStatus(stats []types.ServiceStatus) error {
|
||
tx, err := e.db.Begin()
|
||
if err != nil {
|
||
return err
|
||
}
|
||
defer tx.Rollback()
|
||
|
||
stmt, err := tx.Prepare("INSERT INTO service_status (timestamp, service_name, active_state, sub_state) VALUES (?, ?, ?, ?)")
|
||
if err != nil {
|
||
return err
|
||
}
|
||
defer stmt.Close()
|
||
|
||
for _, s := range stats {
|
||
if _, err := stmt.Exec(s.Timestamp, s.ServiceName, s.ActiveState, s.SubState); err != nil {
|
||
return err
|
||
}
|
||
}
|
||
return tx.Commit()
|
||
}
|
||
|
||
// ── Arrow insert (raw_metrics) ────────────────────────────────────────────────
|
||
|
||
// loadMetricsArrow inserts MetricSnapshots into DuckDB via the Arrow
|
||
// RegisterView + INSERT SELECT path (zero-copy on the Go side).
|
||
//
|
||
// The dedicated dbConn is closed before returning so DuckDB commits the rows
|
||
// before the subsequent fusion query reads them.
|
||
func (e *TransformEngine) loadMetricsArrow(metrics []types.MetricSnapshot) error {
|
||
schema := arrow.NewSchema([]arrow.Field{
|
||
{Name: "timestamp", Type: arrow.FixedWidthTypes.Timestamp_ns},
|
||
{Name: "cpu_percent", Type: arrow.PrimitiveTypes.Float64},
|
||
{Name: "cpu_iowait_percent", Type: arrow.PrimitiveTypes.Float64},
|
||
{Name: "cpu_softirq_percent", Type: arrow.PrimitiveTypes.Float64},
|
||
{Name: "context_switches_s", Type: arrow.PrimitiveTypes.Float64},
|
||
{Name: "interrupts_s", Type: arrow.PrimitiveTypes.Float64},
|
||
{Name: "memory_used_mb", Type: arrow.PrimitiveTypes.Float64},
|
||
{Name: "memory_cached_mb", Type: arrow.PrimitiveTypes.Float64},
|
||
{Name: "memory_dirty_mb", Type: arrow.PrimitiveTypes.Float64},
|
||
{Name: "net_in_mbps", Type: arrow.PrimitiveTypes.Float64},
|
||
{Name: "net_out_mbps", Type: arrow.PrimitiveTypes.Float64},
|
||
{Name: "disk_read_mbps", Type: arrow.PrimitiveTypes.Float64},
|
||
{Name: "disk_write_mbps", Type: arrow.PrimitiveTypes.Float64},
|
||
{Name: "disk_read_time_s", Type: arrow.PrimitiveTypes.Float64},
|
||
{Name: "disk_write_time_s", Type: arrow.PrimitiveTypes.Float64},
|
||
{Name: "disk_io_ticks_s", Type: arrow.PrimitiveTypes.Float64},
|
||
{Name: "network_errors_s", Type: arrow.PrimitiveTypes.Float64},
|
||
{Name: "network_drops_s", Type: arrow.PrimitiveTypes.Float64},
|
||
{Name: "tcp_retrans_s", Type: arrow.PrimitiveTypes.Float64},
|
||
{Name: "tcp_timeouts_s", Type: arrow.PrimitiveTypes.Float64},
|
||
{Name: "tcp_lost_retransmit_s", Type: arrow.PrimitiveTypes.Float64},
|
||
{Name: "tcp_fast_retrans_s", Type: arrow.PrimitiveTypes.Float64},
|
||
{Name: "softnet_dropped_s", Type: arrow.PrimitiveTypes.Float64},
|
||
{Name: "softnet_time_squeeze_s", Type: arrow.PrimitiveTypes.Float64},
|
||
{Name: "net_packets_in_s", Type: arrow.PrimitiveTypes.Float64},
|
||
{Name: "net_packets_out_s", Type: arrow.PrimitiveTypes.Float64},
|
||
{Name: "disk_reads_s", Type: arrow.PrimitiveTypes.Float64},
|
||
{Name: "disk_writes_s", Type: arrow.PrimitiveTypes.Float64},
|
||
}, nil)
|
||
|
||
b := array.NewRecordBuilder(e.pool, schema)
|
||
defer b.Release()
|
||
|
||
tsb := b.Field(0).(*array.TimestampBuilder)
|
||
cpuF := b.Field(1).(*array.Float64Builder)
|
||
iowait := b.Field(2).(*array.Float64Builder)
|
||
softirq := b.Field(3).(*array.Float64Builder)
|
||
ctxt := b.Field(4).(*array.Float64Builder)
|
||
intr := b.Field(5).(*array.Float64Builder)
|
||
memUsed := b.Field(6).(*array.Float64Builder)
|
||
memCached := b.Field(7).(*array.Float64Builder)
|
||
memDirty := b.Field(8).(*array.Float64Builder)
|
||
netIn := b.Field(9).(*array.Float64Builder)
|
||
netOut := b.Field(10).(*array.Float64Builder)
|
||
diskRead := b.Field(11).(*array.Float64Builder)
|
||
diskWrite := b.Field(12).(*array.Float64Builder)
|
||
diskRTime := b.Field(13).(*array.Float64Builder)
|
||
diskWTime := b.Field(14).(*array.Float64Builder)
|
||
diskTicks := b.Field(15).(*array.Float64Builder)
|
||
netErr := b.Field(16).(*array.Float64Builder)
|
||
netDrop := b.Field(17).(*array.Float64Builder)
|
||
tcpRetrans := b.Field(18).(*array.Float64Builder)
|
||
tcpTimeout := b.Field(19).(*array.Float64Builder)
|
||
tcpLost := b.Field(20).(*array.Float64Builder)
|
||
tcpFast := b.Field(21).(*array.Float64Builder)
|
||
softDrop := b.Field(22).(*array.Float64Builder)
|
||
softSqueeze := b.Field(23).(*array.Float64Builder)
|
||
pIn := b.Field(24).(*array.Float64Builder)
|
||
pOut := b.Field(25).(*array.Float64Builder)
|
||
dRComp := b.Field(26).(*array.Float64Builder)
|
||
dWComp := b.Field(27).(*array.Float64Builder)
|
||
|
||
for i := range metrics {
|
||
m := &metrics[i]
|
||
tsb.Append(arrow.Timestamp(m.Timestamp.UnixNano()))
|
||
cpuF.Append(m.CPUPercent)
|
||
iowait.Append(m.CPUIoWaitPercent)
|
||
softirq.Append(m.CPUSoftIrqPercent)
|
||
ctxt.Append(m.ContextSwitchesPerS)
|
||
intr.Append(m.InterruptsPerS)
|
||
memUsed.Append(m.MemoryUsedMB)
|
||
memCached.Append(m.MemoryCachedMB)
|
||
memDirty.Append(m.MemoryDirtyMB)
|
||
netIn.Append(m.NetworkInMBps)
|
||
netOut.Append(m.NetworkOutMBps)
|
||
diskRead.Append(m.DiskReadMBps)
|
||
diskWrite.Append(m.DiskWriteMBps)
|
||
diskRTime.Append(m.DiskReadTimeMsPerS)
|
||
diskWTime.Append(m.DiskWriteTimeMsPerS)
|
||
diskTicks.Append(m.DiskIOTicksPerS)
|
||
netErr.Append(m.NetErrorsPerS)
|
||
netDrop.Append(m.NetDropsPerS)
|
||
tcpRetrans.Append(m.TCPRetransPerS)
|
||
tcpTimeout.Append(m.TCPTimeoutsPerS)
|
||
tcpLost.Append(m.TCPLostRetransmitPerS)
|
||
tcpFast.Append(m.TCPFastRetransPerS)
|
||
softDrop.Append(m.SoftnetDroppedPerS)
|
||
softSqueeze.Append(m.SoftnetTimeSqueezePerS)
|
||
pIn.Append(m.NetPacketsInPerS)
|
||
pOut.Append(m.NetPacketsOutPerS)
|
||
dRComp.Append(m.DiskReadsCompletedPerS)
|
||
dWComp.Append(m.DiskWritesCompletedPerS)
|
||
}
|
||
|
||
rec := b.NewRecordBatch()
|
||
defer rec.Release()
|
||
|
||
dbConn, err := e.conn.Connect(context.Background())
|
||
if err != nil {
|
||
return fmt.Errorf("connect: %w", err)
|
||
}
|
||
var connErr error
|
||
defer func() {
|
||
if cerr := dbConn.Close(); cerr != nil && connErr == nil {
|
||
connErr = fmt.Errorf("close arrow conn: %w", cerr)
|
||
}
|
||
}()
|
||
|
||
arr, err := duckdb.NewArrowFromConn(dbConn)
|
||
if err != nil {
|
||
return fmt.Errorf("create arrow interface: %w", err)
|
||
}
|
||
|
||
rdr, err := array.NewRecordReader(schema, []arrow.RecordBatch{rec})
|
||
if err != nil {
|
||
return fmt.Errorf("create record reader: %w", err)
|
||
}
|
||
defer rdr.Release()
|
||
|
||
releaseView, err := arr.RegisterView(rdr, "temp_metrics_arrow_view")
|
||
if err != nil {
|
||
return fmt.Errorf("register arrow view: %w", err)
|
||
}
|
||
defer releaseView()
|
||
|
||
res, err := arr.QueryContext(context.Background(),
|
||
"INSERT INTO raw_metrics SELECT * FROM temp_metrics_arrow_view")
|
||
if err != nil {
|
||
return fmt.Errorf("insert from arrow view: %w", err)
|
||
}
|
||
if res != nil {
|
||
res.Release()
|
||
}
|
||
return connErr
|
||
}
|
||
|
||
// ── SQL insert (log_events + log_params) ──────────────────────────────────────
|
||
|
||
func (e *TransformEngine) loadLogs(logs []types.LogEvent) error {
|
||
if len(logs) == 0 {
|
||
return nil
|
||
}
|
||
if err := e.loadLogEventsArrow(logs); err != nil {
|
||
return fmt.Errorf("load log events arrow: %w", err)
|
||
}
|
||
if len(e.paramNames) > 0 {
|
||
if err := e.loadLogParamsArrow(logs); err != nil {
|
||
return fmt.Errorf("load log params arrow: %w", err)
|
||
}
|
||
}
|
||
return nil
|
||
}
|
||
|
||
func (e *TransformEngine) loadLogEventsArrow(logs []types.LogEvent) error {
|
||
schema := arrow.NewSchema([]arrow.Field{
|
||
{Name: "timestamp", Type: arrow.FixedWidthTypes.Timestamp_ns},
|
||
{Name: "template_id", Type: arrow.PrimitiveTypes.Int32},
|
||
{Name: "severity", Type: arrow.BinaryTypes.String},
|
||
{Name: "parameters", Type: arrow.BinaryTypes.String},
|
||
}, nil)
|
||
|
||
b := array.NewRecordBuilder(e.pool, schema)
|
||
defer b.Release()
|
||
|
||
tsb := b.Field(0).(*array.TimestampBuilder)
|
||
tid := b.Field(1).(*array.Int32Builder)
|
||
sev := b.Field(2).(*array.StringBuilder)
|
||
prm := b.Field(3).(*array.StringBuilder)
|
||
|
||
for i := range logs {
|
||
l := &logs[i]
|
||
tsb.Append(arrow.Timestamp(l.Timestamp.UnixNano()))
|
||
tid.Append(int32(l.TemplateID))
|
||
sev.Append(l.Severity)
|
||
|
||
paramJSON, jerr := json.Marshal(l.Params)
|
||
if jerr != nil {
|
||
prm.Append("{}")
|
||
} else {
|
||
prm.Append(string(paramJSON))
|
||
}
|
||
}
|
||
|
||
rec := b.NewRecordBatch()
|
||
defer rec.Release()
|
||
|
||
return e.insertArrowRecord(schema, rec,
|
||
"temp_logs_arrow_view",
|
||
"INSERT INTO log_events SELECT * FROM temp_logs_arrow_view")
|
||
}
|
||
|
||
func (e *TransformEngine) loadLogParamsArrow(logs []types.LogEvent) error {
|
||
fields := []arrow.Field{
|
||
{Name: "event_time", Type: arrow.FixedWidthTypes.Timestamp_ns},
|
||
}
|
||
for _, name := range e.paramNames {
|
||
var typ arrow.DataType = arrow.BinaryTypes.String
|
||
switch e.patternTypes[name] {
|
||
case "float":
|
||
typ = arrow.PrimitiveTypes.Float64
|
||
case "int":
|
||
typ = arrow.PrimitiveTypes.Int64
|
||
}
|
||
fields = append(fields, arrow.Field{Name: "param_" + name, Type: typ})
|
||
}
|
||
|
||
schema := arrow.NewSchema(fields, nil)
|
||
b := array.NewRecordBuilder(e.pool, schema)
|
||
defer b.Release()
|
||
|
||
tsb := b.Field(0).(*array.TimestampBuilder)
|
||
rowCount := 0
|
||
|
||
for i := range logs {
|
||
l := &logs[i]
|
||
if len(l.Params) == 0 {
|
||
continue
|
||
}
|
||
|
||
// Skip log events that carry none of the configured param names.
|
||
hasValue := false
|
||
for _, name := range e.paramNames {
|
||
if _, ok := l.Params[name]; ok {
|
||
hasValue = true
|
||
break
|
||
}
|
||
}
|
||
if !hasValue {
|
||
continue
|
||
}
|
||
|
||
tsb.Append(arrow.Timestamp(l.Timestamp.UnixNano()))
|
||
for j, name := range e.paramNames {
|
||
raw, ok := l.Params[name]
|
||
if !ok {
|
||
b.Field(j + 1).AppendNull()
|
||
continue
|
||
}
|
||
raw = strings.TrimSpace(raw)
|
||
switch e.patternTypes[name] {
|
||
case "float":
|
||
f, err := strconv.ParseFloat(raw, 64)
|
||
if err == nil {
|
||
b.Field(j + 1).(*array.Float64Builder).Append(f)
|
||
} else {
|
||
log.Printf("transform: cannot parse float param %q=%q", name, raw)
|
||
b.Field(j + 1).AppendNull()
|
||
}
|
||
case "int":
|
||
iv, err := strconv.ParseInt(raw, 10, 64)
|
||
if err == nil {
|
||
b.Field(j + 1).(*array.Int64Builder).Append(iv)
|
||
} else {
|
||
log.Printf("transform: cannot parse int param %q=%q", name, raw)
|
||
b.Field(j + 1).AppendNull()
|
||
}
|
||
default:
|
||
b.Field(j + 1).(*array.StringBuilder).Append(raw)
|
||
}
|
||
}
|
||
rowCount++
|
||
}
|
||
|
||
if rowCount == 0 {
|
||
return nil
|
||
}
|
||
|
||
rec := b.NewRecordBatch()
|
||
defer rec.Release()
|
||
|
||
cols := make([]string, 0, 1+len(e.paramNames))
|
||
cols = append(cols, "event_time")
|
||
for _, name := range e.paramNames {
|
||
cols = append(cols, "param_"+name)
|
||
}
|
||
query := fmt.Sprintf(
|
||
"INSERT INTO log_params (%s) SELECT * FROM temp_params_arrow_view",
|
||
strings.Join(cols, ", "),
|
||
)
|
||
return e.insertArrowRecord(schema, rec, "temp_params_arrow_view", query)
|
||
}
|
||
|
||
// insertArrowRecord registers rec as a DuckDB view named viewName and executes
|
||
// insertQuery against it. The connection is closed before returning to ensure
|
||
// DuckDB commits the rows prior to the next read.
|
||
func (e *TransformEngine) insertArrowRecord(
|
||
schema *arrow.Schema,
|
||
rec arrow.RecordBatch,
|
||
viewName, insertQuery string,
|
||
) error {
|
||
dbConn, err := e.conn.Connect(context.Background())
|
||
if err != nil {
|
||
return fmt.Errorf("connect: %w", err)
|
||
}
|
||
var connErr error
|
||
defer func() {
|
||
if cerr := dbConn.Close(); cerr != nil && connErr == nil {
|
||
connErr = fmt.Errorf("close arrow conn: %w", cerr)
|
||
}
|
||
}()
|
||
|
||
arr, err := duckdb.NewArrowFromConn(dbConn)
|
||
if err != nil {
|
||
return fmt.Errorf("create arrow interface: %w", err)
|
||
}
|
||
|
||
rdr, err := array.NewRecordReader(schema, []arrow.RecordBatch{rec})
|
||
if err != nil {
|
||
return fmt.Errorf("create record reader: %w", err)
|
||
}
|
||
defer rdr.Release()
|
||
|
||
releaseView, err := arr.RegisterView(rdr, viewName)
|
||
if err != nil {
|
||
return fmt.Errorf("register arrow view %s: %w", viewName, err)
|
||
}
|
||
defer releaseView()
|
||
|
||
res, err := arr.QueryContext(context.Background(), insertQuery)
|
||
if err != nil {
|
||
return fmt.Errorf("insert from arrow view %s: %w", viewName, err)
|
||
}
|
||
if res != nil {
|
||
res.Release()
|
||
}
|
||
return connErr
|
||
}
|
||
|
||
// ── fusion query ──────────────────────────────────────────────────────────────
|
||
|
||
// runFusionQuery executes the pre-compiled fusion query and maps the result
|
||
// row into a FeatureVector.
|
||
//
|
||
// Slots populated here:
|
||
// - struct fields (raw aggregates) for feature engineering and feature_json
|
||
// - NormalizedVector[0..22]: sc_* DuckDB RobustScaler output
|
||
// - ParamAvg map: avg_param_* columns
|
||
//
|
||
// Slots 23+ of NormalizedVector are populated by processWindow after this call.
|
||
func (e *TransformEngine) runFusionQuery() (*types.FeatureVector, error) {
|
||
rows, err := e.db.Query(e.fusionQuery)
|
||
if err != nil {
|
||
return nil, fmt.Errorf("execute: %w", err)
|
||
}
|
||
defer rows.Close()
|
||
|
||
if !rows.Next() {
|
||
return nil, rows.Err()
|
||
}
|
||
|
||
cols, err := rows.Columns()
|
||
if err != nil {
|
||
return nil, fmt.Errorf("columns: %w", err)
|
||
}
|
||
|
||
dest := make([]any, len(cols))
|
||
ptrs := make([]any, len(cols))
|
||
for i := range dest {
|
||
ptrs[i] = &dest[i]
|
||
}
|
||
if err := rows.Scan(ptrs...); err != nil {
|
||
return nil, fmt.Errorf("scan: %w", err)
|
||
}
|
||
|
||
fv := &types.FeatureVector{
|
||
ParamAvg: make(map[string]float64),
|
||
ServiceStatuses: make(map[string]float64),
|
||
}
|
||
|
||
// Additional raw sums for derived features
|
||
var sumDiskRTime, sumDiskWTime, sumDiskReads, sumDiskWrites, sumPacketsOut float64
|
||
|
||
for i, col := range cols {
|
||
if dest[i] == nil {
|
||
continue
|
||
}
|
||
switch col {
|
||
case "ws":
|
||
if t, ok := dest[i].(time.Time); ok {
|
||
fv.WindowStart = t
|
||
}
|
||
case "avg_cpu":
|
||
fv.AvgCPUPercent = toFloat64(dest[i])
|
||
case "max_cpu":
|
||
fv.MaxCPUPercent = toFloat64(dest[i])
|
||
case "std_cpu":
|
||
fv.StdCPUPercent = toFloat64(dest[i])
|
||
case "avg_iowait":
|
||
fv.AvgCPUIoWait = toFloat64(dest[i])
|
||
case "std_iowait":
|
||
fv.StdCPUIoWait = toFloat64(dest[i])
|
||
case "avg_softirq":
|
||
fv.AvgCPUSoftIrq = toFloat64(dest[i])
|
||
case "avg_ctx_switches":
|
||
fv.AvgCtxSwitches = toFloat64(dest[i])
|
||
case "avg_interrupts":
|
||
fv.AvgInterrupts = toFloat64(dest[i])
|
||
case "avg_softnet_dropped":
|
||
fv.AvgSoftnetDropped = toFloat64(dest[i])
|
||
case "avg_softnet_squeeze":
|
||
fv.AvgSoftnetSqueeze = toFloat64(dest[i])
|
||
case "avg_mem_used":
|
||
fv.AvgMemUsedMB = toFloat64(dest[i])
|
||
case "avg_mem_cached":
|
||
fv.AvgMemCachedMB = toFloat64(dest[i])
|
||
case "max_mem_dirty":
|
||
fv.MaxMemDirtyMB = toFloat64(dest[i])
|
||
case "avg_net_in":
|
||
fv.AvgNetInMBps = toFloat64(dest[i])
|
||
case "std_net_in":
|
||
fv.StdNetInMBps = toFloat64(dest[i])
|
||
case "avg_net_out":
|
||
fv.AvgNetOutMBps = toFloat64(dest[i])
|
||
case "std_net_out":
|
||
fv.StdNetOutMBps = toFloat64(dest[i])
|
||
case "avg_net_drops":
|
||
fv.AvgNetDrops = toFloat64(dest[i])
|
||
case "sum_tcp_retrans":
|
||
fv.SumTCPRetrans = toFloat64(dest[i])
|
||
case "sum_tcp_fast_retrans":
|
||
fv.SumTCPFastRetrans = toFloat64(dest[i])
|
||
case "sum_tcp_timeouts":
|
||
fv.SumTCPTimeouts = toFloat64(dest[i])
|
||
case "avg_disk_read":
|
||
fv.AvgDiskReadMBps = toFloat64(dest[i])
|
||
case "avg_disk_write":
|
||
fv.AvgDiskWriteMBps = toFloat64(dest[i])
|
||
case "avg_disk_io_ticks":
|
||
fv.AvgDiskIOTicks = toFloat64(dest[i])
|
||
case "std_disk_io_ticks":
|
||
fv.StdDiskIOTicks = toFloat64(dest[i])
|
||
case "error_count":
|
||
fv.ErrorCount = int(toInt64(dest[i]))
|
||
case "severity_score":
|
||
fv.SeverityScore = toFloat64(dest[i])
|
||
case "log_event_count":
|
||
fv.LogCountTotal = int(toInt64(dest[i]))
|
||
case "unique_templates":
|
||
fv.UniqueTemplates = int(toInt64(dest[i]))
|
||
case "sum_disk_read_time":
|
||
sumDiskRTime = toFloat64(dest[i])
|
||
case "sum_disk_write_time":
|
||
sumDiskWTime = toFloat64(dest[i])
|
||
case "sum_disk_reads":
|
||
sumDiskReads = toFloat64(dest[i])
|
||
case "sum_disk_writes":
|
||
sumDiskWrites = toFloat64(dest[i])
|
||
case "sum_packets_out":
|
||
sumPacketsOut = toFloat64(dest[i])
|
||
default:
|
||
if strings.HasPrefix(col, "avg_param_") {
|
||
name := strings.TrimPrefix(col, "avg_param_")
|
||
fv.ParamAvg[name] = toFloat64(dest[i])
|
||
}
|
||
if strings.HasPrefix(col, "svc_") {
|
||
name := strings.TrimPrefix(col, "svc_")
|
||
fv.ServiceStatuses[name] = toFloat64(dest[i])
|
||
}
|
||
}
|
||
}
|
||
|
||
// Derived calculations from raw sums
|
||
fv.AvgNetThroughput = fv.AvgNetInMBps + fv.AvgNetOutMBps
|
||
fv.AvgDiskLatencyMS = (sumDiskRTime + sumDiskWTime) / (sumDiskReads + sumDiskWrites + 1e-3)
|
||
fv.RetransPerPacket = fv.SumTCPRetrans / (sumPacketsOut + 1e-3)
|
||
|
||
// Build NormalizedVector from sc_* columns in canonical feature order.
|
||
scaledMap := make(map[string]float64)
|
||
for i, col := range cols {
|
||
if dest[i] != nil && strings.HasPrefix(col, "sc_") {
|
||
scaledMap[col] = toFloat64(dest[i])
|
||
}
|
||
}
|
||
scOrder := make([]string, 0, len(scalerFeatureNames))
|
||
for _, name := range scalerFeatureNames {
|
||
scOrder = append(scOrder, "sc_"+name)
|
||
}
|
||
|
||
scaled := make([]float64, len(scOrder))
|
||
for i, key := range scOrder {
|
||
scaled[i] = scaledMap[key]
|
||
}
|
||
fv.NormalizedVector = scaled
|
||
return fv, nil
|
||
}
|
||
|
||
// ── persistence ───────────────────────────────────────────────────────────────
|
||
|
||
func (e *TransformEngine) persistFeatureVector(fv *types.FeatureVector) error {
|
||
featureMap := fv.ToNamedMap(e.cfg.NumericPatternNames())
|
||
|
||
jsonBytes, err := json.Marshal(featureMap)
|
||
if err != nil {
|
||
return fmt.Errorf("marshal feature json: %w", err)
|
||
}
|
||
|
||
if _, err := e.db.Exec(e.featureInsertSQL, fv.WindowStart, string(jsonBytes)); err != nil {
|
||
return fmt.Errorf("insert features row: %w", err)
|
||
}
|
||
return nil
|
||
}
|
||
|
||
// UpdateAnomalyScore back-fills the anomaly score for a completed window.
|
||
// Called by the detection stage after scoring the FeatureVector.
|
||
func (e *TransformEngine) UpdateAnomalyScore(window time.Time, score float64) {
|
||
const q = `UPDATE features SET anomaly_score = ? WHERE window_start = ?`
|
||
if _, err := e.db.Exec(q, score, window); err != nil {
|
||
log.Printf("transform: UpdateAnomalyScore window=%s: %v",
|
||
window.Format(time.RFC3339), err)
|
||
}
|
||
}
|
||
|
||
// ── health ────────────────────────────────────────────────────────────────────
|
||
|
||
func (e *TransformEngine) emitHealth() {
|
||
e.mu.Lock()
|
||
p := e.processed
|
||
l := e.avgLatency
|
||
e.mu.Unlock()
|
||
|
||
select {
|
||
case e.healthChan <- types.StageHealth{
|
||
StageName: "transform_engine",
|
||
EventsProcessed: p,
|
||
AvgLatencyMs: l,
|
||
LastUpdate: time.Now(),
|
||
}:
|
||
default:
|
||
}
|
||
}
|
||
|
||
// ── type helpers ──────────────────────────────────────────────────────────────
|
||
|
||
// toFloat64 converts DuckDB result values to float64 for field assignment.
|
||
// Handles the concrete types returned by the duckdb-go driver.
|
||
// Returns 0 for any unsupported type.
|
||
func toFloat64(v any) float64 {
|
||
switch t := v.(type) {
|
||
case float64:
|
||
return t
|
||
case float32:
|
||
return float64(t)
|
||
case int64:
|
||
return float64(t)
|
||
case uint64:
|
||
return float64(t)
|
||
}
|
||
return 0
|
||
}
|
||
|
||
// toInt64 converts DuckDB result values to int64. Returns 0 for unsupported types.
|
||
func toInt64(v any) int64 {
|
||
switch t := v.(type) {
|
||
case int64:
|
||
return t
|
||
case uint64:
|
||
return int64(t)
|
||
case float64:
|
||
return int64(t)
|
||
}
|
||
return 0
|
||
}
|
||
|
||
// calcStdDev returns the sample standard deviation (Bessel-corrected) of data.
|
||
// Returns 0 when fewer than two data points are provided.
|
||
func calcStdDev(data []float64) float64 {
|
||
n := float64(len(data))
|
||
if n < 2 {
|
||
return 0.0
|
||
}
|
||
var sum float64
|
||
for _, v := range data {
|
||
sum += v
|
||
}
|
||
mean := sum / n
|
||
var variance float64
|
||
for _, v := range data {
|
||
d := v - mean
|
||
variance += d * d
|
||
}
|
||
return math.Sqrt(variance / (n - 1))
|
||
}
|