// Package types defines the shared data structures that flow between pipeline // stages. All types are value-safe to copy and JSON-serialisable. package types import "time" // ── LogEvent ───────────────────────────────────────────────────────────────── // LogEvent represents a single parsed log line after Drain3 template mining. type LogEvent struct { Timestamp time.Time `json:"timestamp"` TemplateID int `json:"template_id"` Params map[string]string `json:"params"` Severity string `json:"severity"` RawLine string `json:"raw_line"` } // ── ServiceStatus ───────────────────────────────────────────────────────────── // ServiceStatus represents the state of a systemd service. type ServiceStatus struct { Timestamp time.Time `json:"timestamp"` ServiceName string `json:"service_name"` ActiveState string `json:"active_state"` // e.g. "active", "inactive", "failed" SubState string `json:"sub_state"` // e.g. "running", "dead", "exited" } // ── MetricSnapshot ──────────────────────────────────────────────────────────── // MetricSnapshot is a 1 Hz sample of Linux system metrics collected from /proc. type MetricSnapshot struct { Timestamp time.Time `json:"timestamp"` CPUPercent float64 `json:"cpu_percent"` CPUIoWaitPercent float64 `json:"cpu_iowait_percent"` CPUSoftIrqPercent float64 `json:"cpu_softirq_percent"` ContextSwitchesPerS float64 `json:"context_switches_s"` InterruptsPerS float64 `json:"interrupts_s"` MemoryUsedMB float64 `json:"memory_used_mb"` MemoryCachedMB float64 `json:"memory_cached_mb"` MemoryDirtyMB float64 `json:"memory_dirty_mb"` NetworkInMBps float64 `json:"net_in_mbps"` NetworkOutMBps float64 `json:"net_out_mbps"` NetErrorsPerS float64 `json:"network_errors_s"` NetDropsPerS float64 `json:"network_drops_s"` TCPRetransPerS float64 `json:"tcp_retrans_s"` TCPTimeoutsPerS float64 `json:"tcp_timeouts_s"` TCPLostRetransmitPerS float64 `json:"tcp_lost_retransmit_s"` TCPFastRetransPerS float64 `json:"tcp_fast_retrans_s"` DiskReadMBps float64 `json:"disk_read_mbps"` DiskWriteMBps float64 `json:"disk_write_mbps"` DiskReadTimeMsPerS float64 `json:"disk_read_time_s"` DiskWriteTimeMsPerS float64 `json:"disk_write_time_s"` DiskIOTicksPerS float64 `json:"disk_io_ticks_s"` SoftnetDroppedPerS float64 `json:"softnet_dropped_s"` SoftnetTimeSqueezePerS float64 `json:"softnet_time_squeeze_s"` NetPacketsInPerS float64 `json:"net_packets_in_s"` NetPacketsOutPerS float64 `json:"net_packets_out_s"` DiskReadsCompletedPerS float64 `json:"disk_reads_s"` DiskWritesCompletedPerS float64 `json:"disk_writes_s"` } // ── FeatureVector ───────────────────────────────────────────────────────────── // FeatureVector is the output of the DuckDB Tumbling-Window fusion layer. // // # NormalizedVector layout // // Slot 0– 4: CPU (DuckDB RobustScaled) // 0=avg_cpu 1=max_cpu 2=avg_iowait 3=avg_softirq 4=avg_ctx_switches // Slot 5– 7: Memory (DuckDB RobustScaled) // 5=avg_mem_used 6=avg_mem_cached 7=max_mem_dirty // Slot 8: Disk (DuckDB RobustScaled) // 8=avg_disk_io_ticks // Slot 9–12: Network (DuckDB RobustScaled) // 9=avg_net_in 10=avg_net_out 11=avg_net_drops 12=avg_softnet_squeeze // Slot 13–16: TCP (DuckDB RobustScaled) // 13=max_tcp_retrans 14=sum_tcp_fast_retrans // 15=sum_tcp_timeouts 16=sum_tcp_lost_retrans // Slot 17–20: Log (DuckDB RobustScaled) // 17=log_event_count 18=error_count 19=unique_templates 20=error_rate // Slot 21: CPUDelta – Δavg_cpu vs previous window, %-points (unscaled) // Slot 22: RatioTCPNet – sum_tcp_retrans / (avg_net_out + 1e-3), CV=10 (NEW) // Slot 23: DeltaCtx – Δavg_ctx_switches vs previous window, CV=6.2 (NEW) // Slot 24: NetDelta – Δavg_net_out vs previous window, MBps (unscaled) // Slot 25: CPURollStd – rolling σ(avg_cpu, 12 windows) (unscaled) // Slot 26: CPUEfficiency – avg_cpu / (avg_net_out + 1) (unscaled) // Slot 27: IOWaitProxy – avg_disk_io_ticks / (avg_cpu + 1) (unscaled) // Slot 28: LogDensity – unique_templates / (log_count + 1) (unscaled) // Slot 29: DeltaNetIn – Δavg_net_in vs previous window, MBps (unscaled) // Slot 30: DeltaTCPRetrans – Δsum_tcp_retrans vs previous window (unscaled) // Slot 31: TcpRollStd – rolling σ(sum_tcp_retrans, 5 windows) (unscaled) // Slot 32: NetRollStd – rolling σ(avg_net_out, 5 windows) (unscaled) // Slot 33: MemPressure – avg_dirty_mb / (avg_mem_used + 1) (unscaled) // Slot 34: NetAsymmetry – avg_net_in / (avg_net_out + 1e-3) (unscaled) // Slot 35+: Drain param averages (unscaled) type FeatureVector struct { Timestamp time.Time `json:"timestamp"` WindowStart time.Time `json:"window_start"` WindowEnd time.Time `json:"window_end"` // CPU aggregations AvgCPUPercent float64 `json:"avg_cpu"` MaxCPUPercent float64 `json:"max_cpu"` StdCPUPercent float64 `json:"std_cpu"` AvgCPUIoWait float64 `json:"avg_iowait"` StdCPUIoWait float64 `json:"std_iowait"` AvgCPUSoftIrq float64 `json:"avg_softirq"` AvgCtxSwitches float64 `json:"avg_ctx_switches"` AvgInterrupts float64 `json:"avg_interrupts"` // Memory aggregations AvgMemUsedMB float64 `json:"avg_mem_used"` AvgMemCachedMB float64 `json:"avg_mem_cached"` MaxMemDirtyMB float64 `json:"max_mem_dirty"` // Disk aggregations AvgDiskIOTicks float64 `json:"avg_disk_io_ticks"` StdDiskIOTicks float64 `json:"std_disk_io_ticks"` AvgDiskReadMBps float64 `json:"avg_disk_read"` AvgDiskWriteMBps float64 `json:"avg_disk_write"` // Network aggregations AvgNetInMBps float64 `json:"avg_net_in"` StdNetInMBps float64 `json:"std_net_in"` AvgNetOutMBps float64 `json:"avg_net_out"` StdNetOutMBps float64 `json:"std_net_out"` AvgNetDrops float64 `json:"avg_net_drops"` AvgSoftnetDropped float64 `json:"avg_softnet_dropped"` AvgSoftnetSqueeze float64 `json:"avg_softnet_squeeze"` // TCP aggregations SumTCPRetrans float64 `json:"sum_tcp_retrans"` SumTCPFastRetrans float64 `json:"sum_tcp_fast_retrans"` SumTCPTimeouts float64 `json:"sum_tcp_timeouts"` // Log aggregations ErrorCount int `json:"error_count"` SeverityScore float64 `json:"severity_score"` // Engineered / Derived features CPUDelta float64 `json:"cpu_delta"` CPURollStd float64 `json:"cpu_roll_std"` CPUEfficiency float64 `json:"cpu_efficiency"` DeltaCtx float64 `json:"delta_ctx"` NetDelta float64 `json:"net_delta"` AvgNetThroughput float64 `json:"avg_net_throughput"` CPUPerMB float64 `json:"cpu_per_mb"` NetworkDiskRatio float64 `json:"network_disk_ratio"` RetransPerPacket float64 `json:"retrans_per_packet"` RetransPerMB float64 `json:"retrans_per_mb"` AvgDiskLatencyMS float64 `json:"avg_disk_latency_ms"` LogCountTotal int `json:"log_count_total"` UniqueTemplates int `json:"unique_templates"` LogDensity float64 `json:"log_density"` IOWaitProxy float64 `json:"io_wait_proxy"` DeltaNetIn float64 `json:"delta_net_in"` DeltaTCPRetrans float64 `json:"delta_tcp_retrans"` TcpRollStd float64 `json:"tcp_roll_std"` NetRollStd float64 `json:"net_roll_std"` MemPressure float64 `json:"mem_pressure"` NetAsymmetry float64 `json:"net_asymmetry"` // Drain parameter aggregations ParamAvg map[string]float64 `json:"param_avg"` // ServiceStatuses maps service names to their encoded state (active=1, inactive=0, failed=-1). ServiceStatuses map[string]float64 `json:"service_statuses"` // NormalizedVector is the flat float64 slice consumed by anomaly detectors. NormalizedVector []float64 `json:"normalized_vector"` } // ToFloatSlice serialises fv to a deterministic []float64 for offline EDA. // Returns raw (unscaled) values; use NormalizedVector for ML inference. // // [avg_cpu, max_cpu, std_cpu, // avg_iowait, std_iowait, avg_softirq, avg_ctx_switches, avg_interrupts, // avg_softnet_dropped, avg_softnet_squeeze, // avg_net_in, std_net_in, avg_net_out, std_net_out, // sum_tcp_retrans, sum_tcp_fast_retrans, sum_tcp_timeouts, avg_net_drops, // avg_disk_read, avg_disk_write, avg_disk_io_ticks, std_disk_io_ticks, // error_count, severity_score, // cpu_delta, cpu_roll_std, cpu_efficiency, delta_ctx, net_delta, // avg_net_throughput, cpu_per_mb, network_disk_ratio, retrans_per_packet, // retrans_per_mb, avg_disk_latency_ms, log_count_total, unique_templates, // log_density, io_wait_proxy, delta_net_in, delta_tcp_retrans, // tcp_roll_std, net_roll_std, mem_pressure, net_asymmetry, // param_*] func (fv FeatureVector) ToFloatSlice(paramNames []string) []float64 { out := make([]float64, 0, 45+len(paramNames)) out = append(out, // Base Aggregates (24) fv.AvgCPUPercent, fv.MaxCPUPercent, fv.StdCPUPercent, fv.AvgCPUIoWait, fv.StdCPUIoWait, fv.AvgCPUSoftIrq, fv.AvgCtxSwitches, fv.AvgInterrupts, fv.AvgSoftnetDropped, fv.AvgSoftnetSqueeze, fv.AvgNetInMBps, fv.StdNetInMBps, fv.AvgNetOutMBps, fv.StdNetOutMBps, fv.SumTCPRetrans, fv.SumTCPFastRetrans, fv.SumTCPTimeouts, fv.AvgNetDrops, fv.AvgDiskReadMBps, fv.AvgDiskWriteMBps, fv.AvgDiskIOTicks, fv.StdDiskIOTicks, float64(fv.ErrorCount), fv.SeverityScore, // Engineered Features (21) fv.CPUDelta, fv.CPURollStd, fv.CPUEfficiency, fv.DeltaCtx, fv.NetDelta, fv.AvgNetThroughput, fv.CPUPerMB, fv.NetworkDiskRatio, fv.RetransPerPacket, fv.RetransPerMB, fv.AvgDiskLatencyMS, float64(fv.LogCountTotal), float64(fv.UniqueTemplates), fv.LogDensity, fv.IOWaitProxy, fv.DeltaNetIn, fv.DeltaTCPRetrans, fv.TcpRollStd, fv.NetRollStd, fv.MemPressure, fv.NetAsymmetry, ) for _, name := range paramNames { out = append(out, fv.ParamAvg[name]) } return out } // ToNamedMap returns the feature vector as map[string]float64 func (fv FeatureVector) ToNamedMap(paramNames []string) map[string]float64 { m := map[string]float64{ "avg_cpu": fv.AvgCPUPercent, "max_cpu": fv.MaxCPUPercent, "std_cpu": fv.StdCPUPercent, "avg_iowait": fv.AvgCPUIoWait, "std_iowait": fv.StdCPUIoWait, "avg_softirq": fv.AvgCPUSoftIrq, "avg_ctx_switches": fv.AvgCtxSwitches, "avg_interrupts": fv.AvgInterrupts, "avg_softnet_dropped": fv.AvgSoftnetDropped, "avg_softnet_squeeze": fv.AvgSoftnetSqueeze, "avg_net_in": fv.AvgNetInMBps, "std_net_in": fv.StdNetInMBps, "avg_net_out": fv.AvgNetOutMBps, "std_net_out": fv.StdNetOutMBps, "avg_net_drops": fv.AvgNetDrops, "sum_tcp_retrans": fv.SumTCPRetrans, "sum_tcp_fast_retrans": fv.SumTCPFastRetrans, "sum_tcp_timeouts": fv.SumTCPTimeouts, "avg_disk_read": fv.AvgDiskReadMBps, "avg_disk_write": fv.AvgDiskWriteMBps, "avg_disk_io_ticks": fv.AvgDiskIOTicks, "std_disk_io_ticks": fv.StdDiskIOTicks, "error_count": float64(fv.ErrorCount), "severity_score": fv.SeverityScore, "cpu_delta": fv.CPUDelta, "cpu_roll_std": fv.CPURollStd, "cpu_efficiency": fv.CPUEfficiency, "delta_ctx": fv.DeltaCtx, "net_delta": fv.NetDelta, "avg_net_throughput": fv.AvgNetThroughput, "cpu_per_mb": fv.CPUPerMB, "network_disk_ratio": fv.NetworkDiskRatio, "retrans_per_packet": fv.RetransPerPacket, "retrans_per_mb": fv.RetransPerMB, "avg_disk_latency_ms": fv.AvgDiskLatencyMS, "log_count_total": float64(fv.LogCountTotal), "unique_templates": float64(fv.UniqueTemplates), "log_density": fv.LogDensity, "io_wait_proxy": fv.IOWaitProxy, "delta_net_in": fv.DeltaNetIn, "delta_tcp_retrans": fv.DeltaTCPRetrans, "tcp_roll_std": fv.TcpRollStd, "net_roll_std": fv.NetRollStd, "mem_pressure": fv.MemPressure, "net_asymmetry": fv.NetAsymmetry, } for _, name := range paramNames { m["avg_param_"+name] = fv.ParamAvg[name] } return m } // ── AnomalyResult ───────────────────────────────────────────────────────────── // AnomalyResult is the final output of the detection layer. type AnomalyResult struct { Timestamp time.Time `json:"timestamp"` Score float64 `json:"score"` IsAnomaly bool `json:"is_anomaly"` Confidence float64 `json:"confidence"` Method string `json:"method"` Details string `json:"details,omitempty"` } // ── StageHealth ─────────────────────────────────────────────────────────────── // StageHealth stores per-stage monitoring counters. type StageHealth struct { StageName string `json:"stage_name"` EventsProcessed uint64 `json:"events_processed"` EventsDropped uint64 `json:"events_dropped"` AvgLatencyMs float64 `json:"avg_latency_ms"` Throughput float64 `json:"throughput_eps"` LastUpdate time.Time `json:"last_update"` }