diff --git a/config.go b/config.go index 6527410..249c246 100644 --- a/config.go +++ b/config.go @@ -96,7 +96,17 @@ type Config struct { Level string `mapstructure:"level"` FilePath string `mapstructure:"file_path"` } `mapstructure:"logging"` - PatternsFile string `mapstructure:"patterns_file"` + PatternsFile string `mapstructure:"patterns_file"` + Drain3 Drain3Config `mapstructure:"drain3"` +} + +type Drain3Config struct { + Enabled bool `mapstructure:"enabled"` + StateDir string `mapstructure:"state_dir"` + Depth int `mapstructure:"depth"` + SimThreshold float64 `mapstructure:"sim_th"` + MaxChildren int `mapstructure:"max_children"` + SaveIntervalSeconds int `mapstructure:"save_interval"` } type StorageRotationConfig struct { @@ -153,6 +163,11 @@ func setConfigDefaults() { viper.SetDefault("localstorage.rotation.check_interval_minutes", 5) viper.SetDefault("localstorage.rotation.archive_dir", "") viper.SetDefault("patterns_file", "./configs/patterns.yaml") + viper.SetDefault("drain3.enabled", true) + viper.SetDefault("drain3.state_dir", "./drain3_states") + viper.SetDefault("drain3.depth", 4) + viper.SetDefault("drain3.sim_th", 0.4) + viper.SetDefault("drain3.max_children", 100) } func LoadConfig() (*Config, error) { diff --git a/elasticsearch.go b/elasticsearch.go index 6b217c8..e40677a 100644 --- a/elasticsearch.go +++ b/elasticsearch.go @@ -70,7 +70,6 @@ func (esc *ElasticsearchClient) SendBatch(baseIndex string, entries []models.Log var body strings.Builder for _, entry := range entries { - // indexName := determineIndexName(baseIndex, entry) indexName := "tixel" indexLine := fmt.Sprintf(`{"index":{"_index":"%s"}}`, indexName) @@ -146,14 +145,3 @@ func (esc *ElasticsearchClient) SendSystemMetrics(baseIndex string, metrics mode return nil } - -// func determineIndexName(baseIndex string, entry LogEntry) string { -// switch entry.Type { -// case "system_metrics": -// return fmt.Sprintf("%s-system", baseIndex) -// case "service_log": -// return fmt.Sprintf("%s-service-%s", baseIndex, entry.Service) -// default: -// return fmt.Sprintf("%s-%s", baseIndex, entry.Tool) -// } -// } diff --git a/file_monitor.go b/file_monitor.go index 85fc9a7..608bd17 100644 --- a/file_monitor.go +++ b/file_monitor.go @@ -10,6 +10,7 @@ import ( "watch-tool/parser" "watch-tool/patterns" + "codeberg.org/pata1704/drain3" "github.com/hpcloud/tail" ) @@ -19,16 +20,25 @@ type FileMonitor struct { hostname string } -func NewFileMonitor(config ToolConfig, hostname string) *FileMonitor { +func NewFileMonitor(config ToolConfig, hostname string, drainCfg *drain3.Config, stateDir string) *FileMonitor { var logParser parser.Parser + pCfg := parser.ParserConfig{ + ServiceName: config.Name, + LogType: "custom", + Hostname: hostname, + DrainConfig: drainCfg, + StateDir: stateDir, + } + if config.Format.Pattern != "" { compiledRegex, err := regexp.Compile(config.Format.Pattern) if err != nil { slog.Error("Invalid regex pattern in tool config", "tool", config.Name, "error", err) - logParser = parser.NewGenericParser(config.Name, hostname) + logParser = parser.NewGenericParser(config.Name, hostname, pCfg.DrainConfig, pCfg.StateDir) + } else { - gp := parser.NewGenericParser(config.Name, hostname) + gp := parser.NewGenericParser(config.Name, hostname, pCfg.DrainConfig, pCfg.StateDir) customExtractor := patterns.CompiledExtractor{ Name: "config_custom_pattern", @@ -41,10 +51,10 @@ func NewFileMonitor(config ToolConfig, hostname string) *FileMonitor { } } else { var err error - logParser, err = parser.New(config.Name, "custom", hostname) + logParser, err = parser.New(pCfg) if err != nil { slog.Error("Cannot get tool specific parser from factory", "error", err) - logParser = parser.NewGenericParser(config.Name, hostname) + logParser = parser.NewGenericParser(config.Name, hostname, pCfg.DrainConfig, pCfg.StateDir) } } @@ -56,6 +66,7 @@ func NewFileMonitor(config ToolConfig, hostname string) *FileMonitor { } func (fm *FileMonitor) Start(ctx context.Context, out chan<- models.LogMessage) error { + defer fm.parser.Close() t, err := tail.TailFile(fm.config.LogFile, tail.Config{ Follow: true, ReOpen: true, diff --git a/go.mod b/go.mod index 15dd72a..648e926 100644 --- a/go.mod +++ b/go.mod @@ -1,15 +1,16 @@ module watch-tool -go 1.24.1 +go 1.25.5 require ( + codeberg.org/pata1704/drain3 v1.0.0 github.com/elastic/go-elasticsearch/v8 v8.19.0 github.com/hpcloud/tail v1.0.0 github.com/shirou/gopsutil v3.21.11+incompatible github.com/spf13/viper v1.20.1 - golang.org/x/sys v0.34.0 + golang.org/x/sys v0.37.0 gopkg.in/yaml.v3 v3.0.1 - modernc.org/sqlite v1.39.0 + modernc.org/sqlite v1.44.1 ) require ( @@ -22,7 +23,7 @@ require ( github.com/go-viper/mapstructure/v2 v2.2.1 // indirect github.com/google/uuid v1.6.0 // indirect github.com/mattn/go-isatty v0.0.20 // indirect - github.com/ncruces/go-strftime v0.1.9 // indirect + github.com/ncruces/go-strftime v1.0.0 // indirect github.com/pelletier/go-toml/v2 v2.2.3 // indirect github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec // indirect github.com/sagikazarmark/locafero v0.7.0 // indirect @@ -39,11 +40,11 @@ require ( go.opentelemetry.io/otel/trace v1.29.0 // indirect go.uber.org/atomic v1.9.0 // indirect go.uber.org/multierr v1.9.0 // indirect - golang.org/x/exp v0.0.0-20250620022241-b7579e27df2b // indirect + golang.org/x/exp v0.0.0-20251023183803-a4bb9ffd2546 // indirect golang.org/x/text v0.21.0 // indirect gopkg.in/fsnotify.v1 v1.4.7 // indirect gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7 // indirect - modernc.org/libc v1.66.3 // indirect + modernc.org/libc v1.67.6 // indirect modernc.org/mathutil v1.7.1 // indirect modernc.org/memory v1.11.0 // indirect ) diff --git a/go.sum b/go.sum index 01d3bef..c69400f 100644 --- a/go.sum +++ b/go.sum @@ -1,3 +1,5 @@ +codeberg.org/pata1704/drain3 v1.0.0 h1:X66fn+lnzOMU+PFFSkNBF89z1ghbqihE1I4A6x/OJIM= +codeberg.org/pata1704/drain3 v1.0.0/go.mod h1:+K1hIYh3hNSPiXRxUin6ZiC2CC9FDGqQKNNR+7ZIx9s= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= @@ -26,6 +28,8 @@ github.com/google/pprof v0.0.0-20250317173921-a4b03ec1a45e h1:ijClszYn+mADRFY17k github.com/google/pprof v0.0.0-20250317173921-a4b03ec1a45e/go.mod h1:boTsfXsheKC2y+lKOCMpSfarhxDeIzfZG1jqGcPl3cA= github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= +github.com/hashicorp/golang-lru/v2 v2.0.7 h1:a+bsQ5rvGLjzHuww6tVxozPZFVghXaHOwFs4luLUK2k= +github.com/hashicorp/golang-lru/v2 v2.0.7/go.mod h1:QeFd9opnmA6QUJc5vARoKUSoFhyfM2/ZepoAG6RGpeM= github.com/hpcloud/tail v1.0.0 h1:nfCOvKYfkgYP8hkirhJocXT2+zOD8yUNjXaWfTlyFKI= github.com/hpcloud/tail v1.0.0/go.mod h1:ab1qPbhIpdTxEkNHXyeSf5vhxWSCs/tWer42PpOxQnU= github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE= @@ -34,8 +38,8 @@ github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE= github.com/mattn/go-isatty v0.0.20 h1:xfD0iDuEKnDkl03q4limB+vH+GxLEtL/jb4xVJSWWEY= github.com/mattn/go-isatty v0.0.20/go.mod h1:W+V8PltTTMOvKvAeJH7IuucS94S2C6jfK/D7dTCTo3Y= -github.com/ncruces/go-strftime v0.1.9 h1:bY0MQC28UADQmHmaF5dgpLmImcShSi2kHU9XLdhx/f4= -github.com/ncruces/go-strftime v0.1.9/go.mod h1:Fwc5htZGVVkseilnfgOVb9mKy6w1naJmn9CehxcKcls= +github.com/ncruces/go-strftime v1.0.0 h1:HMFp8mLCTPp341M/ZnA4qaf7ZlsbTc+miZjCLOFAw7w= +github.com/ncruces/go-strftime v1.0.0/go.mod h1:Fwc5htZGVVkseilnfgOVb9mKy6w1naJmn9CehxcKcls= github.com/pelletier/go-toml/v2 v2.2.3 h1:YmeHyLY8mFWbdkNWwpr+qIL2bEqT0o95WSdkNHvL12M= github.com/pelletier/go-toml/v2 v2.2.3/go.mod h1:MfCQTFTvCcUyyvvwm1+G6H/jORL20Xlb6rzQu9GuUkc= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= @@ -82,20 +86,20 @@ go.uber.org/atomic v1.9.0 h1:ECmE8Bn/WFTYwEW/bpKD3M8VtR/zQVbavAoalC1PYyE= go.uber.org/atomic v1.9.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc= go.uber.org/multierr v1.9.0 h1:7fIwc/ZtS0q++VgcfqFDxSBZVv/Xo49/SYnDFupUwlI= go.uber.org/multierr v1.9.0/go.mod h1:X2jQV1h+kxSjClGpnseKVIxpmcjrj7MNnI0bnlfKTVQ= -golang.org/x/exp v0.0.0-20250620022241-b7579e27df2b h1:M2rDM6z3Fhozi9O7NWsxAkg/yqS/lQJ6PmkyIV3YP+o= -golang.org/x/exp v0.0.0-20250620022241-b7579e27df2b/go.mod h1:3//PLf8L/X+8b4vuAfHzxeRUl04Adcb341+IGKfnqS8= -golang.org/x/mod v0.25.0 h1:n7a+ZbQKQA/Ysbyb0/6IbB1H/X41mKgbhfv7AfG/44w= -golang.org/x/mod v0.25.0/go.mod h1:IXM97Txy2VM4PJ3gI61r1YEk/gAj6zAHN3AdZt6S9Ww= -golang.org/x/sync v0.15.0 h1:KWH3jNZsfyT6xfAfKiz6MRNmd46ByHDYaZ7KSkCtdW8= -golang.org/x/sync v0.15.0/go.mod h1:1dzgHSNfp02xaA81J2MS99Qcpr2w7fw1gpm99rleRqA= +golang.org/x/exp v0.0.0-20251023183803-a4bb9ffd2546 h1:mgKeJMpvi0yx/sU5GsxQ7p6s2wtOnGAHZWCHUM4KGzY= +golang.org/x/exp v0.0.0-20251023183803-a4bb9ffd2546/go.mod h1:j/pmGrbnkbPtQfxEe5D0VQhZC6qKbfKifgD0oM7sR70= +golang.org/x/mod v0.29.0 h1:HV8lRxZC4l2cr3Zq1LvtOsi/ThTgWnUk/y64QSs8GwA= +golang.org/x/mod v0.29.0/go.mod h1:NyhrlYXJ2H4eJiRy/WDBO6HMqZQ6q9nk4JzS3NuCK+w= +golang.org/x/sync v0.17.0 h1:l60nONMj9l5drqw6jlhIELNv9I0A4OFgRsG9k2oT9Ug= +golang.org/x/sync v0.17.0/go.mod h1:9KTHXmSnoGruLpwFjVSX0lNNA75CykiMECbovNTZqGI= golang.org/x/sys v0.0.0-20190916202348-b4ddaad3f8a3/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= -golang.org/x/sys v0.34.0 h1:H5Y5sJ2L2JRdyv7ROF1he/lPdvFsd0mJHFw2ThKHxLA= -golang.org/x/sys v0.34.0/go.mod h1:BJP2sWEmIv4KK5OTEluFJCKSidICx8ciO85XgH3Ak8k= +golang.org/x/sys v0.37.0 h1:fdNQudmxPjkdUTPnLn5mdQv7Zwvbvpaxqs831goi9kQ= +golang.org/x/sys v0.37.0/go.mod h1:OgkHotnGiDImocRcuBABYBEXf8A9a87e/uXjp9XT3ks= golang.org/x/text v0.21.0 h1:zyQAAkrwaneQ066sspRyJaG9VNi/YJ1NfzcGB3hZ/qo= golang.org/x/text v0.21.0/go.mod h1:4IBbMaMmOPCJ8SecivzSH54+73PCFmPWxNTLm+vZkEQ= -golang.org/x/tools v0.34.0 h1:qIpSLOxeCYGg9TrcJokLBG4KFA6d795g0xkBkiESGlo= -golang.org/x/tools v0.34.0/go.mod h1:pAP9OwEaY1CAW3HOmg3hLZC5Z0CCmzjAF2UQMSqNARg= +golang.org/x/tools v0.38.0 h1:Hx2Xv8hISq8Lm16jvBZ2VQf+RLmbd7wVUsALibYI/IQ= +golang.org/x/tools v0.38.0/go.mod h1:yEsQ/d/YK8cjh0L6rZlY8tgtlKiBNTL14pGDJPJpYQs= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15 h1:YR8cESwS4TdDjEe65xsg0ogRM/Nc3DYOhEAlW+xobZo= gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= @@ -105,18 +109,20 @@ gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7 h1:uRGJdciOHaEIrze2W8Q3AKkep gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7/go.mod h1:dt/ZhP58zS4L8KSrWDmTeBkI65Dw0HsyUHuEVlX15mw= gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= -modernc.org/cc/v4 v4.26.2 h1:991HMkLjJzYBIfha6ECZdjrIYz2/1ayr+FL8GN+CNzM= -modernc.org/cc/v4 v4.26.2/go.mod h1:uVtb5OGqUKpoLWhqwNQo/8LwvoiEBLvZXIQ/SmO6mL0= -modernc.org/ccgo/v4 v4.28.0 h1:rjznn6WWehKq7dG4JtLRKxb52Ecv8OUGah8+Z/SfpNU= -modernc.org/ccgo/v4 v4.28.0/go.mod h1:JygV3+9AV6SmPhDasu4JgquwU81XAKLd3OKTUDNOiKE= -modernc.org/fileutil v1.3.8 h1:qtzNm7ED75pd1C7WgAGcK4edm4fvhtBsEiI/0NQ54YM= -modernc.org/fileutil v1.3.8/go.mod h1:HxmghZSZVAz/LXcMNwZPA/DRrQZEVP9VX0V4LQGQFOc= +modernc.org/cc/v4 v4.27.1 h1:9W30zRlYrefrDV2JE2O8VDtJ1yPGownxciz5rrbQZis= +modernc.org/cc/v4 v4.27.1/go.mod h1:uVtb5OGqUKpoLWhqwNQo/8LwvoiEBLvZXIQ/SmO6mL0= +modernc.org/ccgo/v4 v4.30.1 h1:4r4U1J6Fhj98NKfSjnPUN7Ze2c6MnAdL0hWw6+LrJpc= +modernc.org/ccgo/v4 v4.30.1/go.mod h1:bIOeI1JL54Utlxn+LwrFyjCx2n2RDiYEaJVSrgdrRfM= +modernc.org/fileutil v1.3.40 h1:ZGMswMNc9JOCrcrakF1HrvmergNLAmxOPjizirpfqBA= +modernc.org/fileutil v1.3.40/go.mod h1:HxmghZSZVAz/LXcMNwZPA/DRrQZEVP9VX0V4LQGQFOc= modernc.org/gc/v2 v2.6.5 h1:nyqdV8q46KvTpZlsw66kWqwXRHdjIlJOhG6kxiV/9xI= modernc.org/gc/v2 v2.6.5/go.mod h1:YgIahr1ypgfe7chRuJi2gD7DBQiKSLMPgBQe9oIiito= +modernc.org/gc/v3 v3.1.1 h1:k8T3gkXWY9sEiytKhcgyiZ2L0DTyCQ/nvX+LoCljoRE= +modernc.org/gc/v3 v3.1.1/go.mod h1:HFK/6AGESC7Ex+EZJhJ2Gni6cTaYpSMmU/cT9RmlfYY= modernc.org/goabi0 v0.2.0 h1:HvEowk7LxcPd0eq6mVOAEMai46V+i7Jrj13t4AzuNks= modernc.org/goabi0 v0.2.0/go.mod h1:CEFRnnJhKvWT1c1JTI3Avm+tgOWbkOu5oPA8eH8LnMI= -modernc.org/libc v1.66.3 h1:cfCbjTUcdsKyyZZfEUKfoHcP3S0Wkvz3jgSzByEWVCQ= -modernc.org/libc v1.66.3/go.mod h1:XD9zO8kt59cANKvHPXpx7yS2ELPheAey0vjIuZOhOU8= +modernc.org/libc v1.67.6 h1:eVOQvpModVLKOdT+LvBPjdQqfrZq+pC39BygcT+E7OI= +modernc.org/libc v1.67.6/go.mod h1:JAhxUVlolfYDErnwiqaLvUqc8nfb2r6S6slAgZOnaiE= modernc.org/mathutil v1.7.1 h1:GCZVGXdaN8gTqB1Mf/usp1Y/hSqgI2vAGGP4jZMCxOU= modernc.org/mathutil v1.7.1/go.mod h1:4p5IwJITfppl0G4sUEDtCr4DthTaT47/N3aT6MhfgJg= modernc.org/memory v1.11.0 h1:o4QC8aMQzmcwCK3t3Ux/ZHmwFPzE6hf2Y5LbkRs+hbI= @@ -125,8 +131,8 @@ modernc.org/opt v0.1.4 h1:2kNGMRiUjrp4LcaPuLY2PzUfqM/w9N23quVwhKt5Qm8= modernc.org/opt v0.1.4/go.mod h1:03fq9lsNfvkYSfxrfUhZCWPk1lm4cq4N+Bh//bEtgns= modernc.org/sortutil v1.2.1 h1:+xyoGf15mM3NMlPDnFqrteY07klSFxLElE2PVuWIJ7w= modernc.org/sortutil v1.2.1/go.mod h1:7ZI3a3REbai7gzCLcotuw9AC4VZVpYMjDzETGsSMqJE= -modernc.org/sqlite v1.39.0 h1:6bwu9Ooim0yVYA7IZn9demiQk/Ejp0BtTjBWFLymSeY= -modernc.org/sqlite v1.39.0/go.mod h1:cPTJYSlgg3Sfg046yBShXENNtPrWrDX8bsbAQBzgQ5E= +modernc.org/sqlite v1.44.1 h1:qybx/rNpfQipX/t47OxbHmkkJuv2JWifCMH8SVUiDas= +modernc.org/sqlite v1.44.1/go.mod h1:CzbrU2lSB1DKUusvwGz7rqEKIq+NUd8GWuBBZDs9/nA= modernc.org/strutil v1.2.1 h1:UneZBkQA+DX2Rp35KcM69cSsNES9ly8mQWD71HKlOA0= modernc.org/strutil v1.2.1/go.mod h1:EHkiggD70koQxjVdSBM3JKM7k6L0FbGE5eymy9i3B9A= modernc.org/token v1.1.0 h1:Xl7Ap9dKaEs5kLoOQeQmPWevfnk/DM5qcLcYlA8ys6Y= diff --git a/main.go b/main.go index 5c67513..6bf2735 100644 --- a/main.go +++ b/main.go @@ -1,6 +1,7 @@ package main import ( + "codeberg.org/pata1704/drain3" "context" "log/slog" "os" @@ -39,6 +40,18 @@ func main() { } slog.Info("Regex patterns loaded successfully", "file", cfg.PatternsFile) + var d3Cfg *drain3.Config + if cfg.Drain3.Enabled { + d3Cfg = &drain3.Config{ + Depth: cfg.Drain3.Depth, + SimTh: cfg.Drain3.SimThreshold, + MaxChildren: cfg.Drain3.MaxChildren, + } + slog.Info("Drain3 anomaly detection enabled", "state_dir", cfg.Drain3.StateDir) + } else { + slog.Info("Drain3 anomaly detection disabled") + } + var storage StorageInterface if cfg.LocalStorage.Enable { rotationConfig := StorageRotationConfig{ @@ -123,7 +136,7 @@ func main() { helpers.SafeGo(ctx, "ServiceMonitor-"+srv.Name, func() { defer wg.Done() - monitor := NewServiceMonitor(srv, currentHostname) + monitor := NewServiceMonitor(srv, currentHostname, d3Cfg, cfg.Drain3.StateDir) if err := monitor.Start(ctx, logChan); err != nil { slog.Error("Error watching service", "service", srv.Name, "error", err) @@ -145,7 +158,7 @@ func main() { helpers.SafeGo(ctx, "FileMonitor-"+t.Name, func() { defer wg.Done() - monitor := NewFileMonitor(t, currentHostname) + monitor := NewFileMonitor(t, currentHostname, d3Cfg, cfg.Drain3.StateDir) if err := monitor.Start(ctx, logChan); err != nil { slog.Error("Error watching tool", "tool", t.Name, "error", err) diff --git a/models/models.go b/models/models.go index 5c9cfee..8e8990e 100644 --- a/models/models.go +++ b/models/models.go @@ -136,18 +136,8 @@ type LogMessage struct { BootID string `json:"boot_id,omitempty"` MachineID string `json:"machine_id,omitempty"` Fields map[string]any `json:"fields,omitempty"` - // SyslogInfo SyslogFields `json:"syslog_information,omitempty"` } -// type LogMessage struct { -// Service string `json:"service"` -// Timestamp time.Time `json:"timestamp"` -// LogLevel string `json:"log_level"` -// LogMessage string `json:"log_message"` -// SyslogInfo SyslogFields `json:"syslog_information"` -// ServiceInformation any `json:"service_info,omitempty"` -// } - type SyslogFields struct { SysLogTimestamp time.Time `json:"syslog_timestamp"` Hostname string `json:"hostname"` diff --git a/parser/factory.go b/parser/factory.go index 4293b0f..ba4afd0 100644 --- a/parser/factory.go +++ b/parser/factory.go @@ -1,10 +1,20 @@ package parser -func New(serviceName, logType, hostname string) (Parser, error) { - switch logType { +import "codeberg.org/pata1704/drain3" + +type ParserConfig struct { + ServiceName string + LogType string + Hostname string + DrainConfig *drain3.Config + StateDir string +} + +func New(cfg ParserConfig) (Parser, error) { + switch cfg.LogType { case "json": return &JSONParser{}, nil default: - return NewGenericParser(serviceName, hostname), nil + return NewGenericParser(cfg.ServiceName, cfg.Hostname, cfg.DrainConfig, cfg.StateDir), nil } } diff --git a/parser/generic_parser.go b/parser/generic_parser.go index af62eba..dd666c3 100644 --- a/parser/generic_parser.go +++ b/parser/generic_parser.go @@ -3,21 +3,28 @@ package parser import ( "fmt" "log/slog" + "os" + "path/filepath" "strconv" "strings" "time" "watch-tool/models" "watch-tool/patterns" + + "codeberg.org/pata1704/drain3" ) type GenericParser struct { - ServiceName string - Hostname string - Extractors []patterns.CompiledExtractor - CommonExt []patterns.CompiledExtractor + ServiceName string + Hostname string + Extractors []patterns.CompiledExtractor + CommonExt []patterns.CompiledExtractor + drainMiner *drain3.TemplateMiner + drainEnabled bool + drainStatePath string } -func NewGenericParser(serviceName, hostname string) *GenericParser { +func NewGenericParser(serviceName, hostname string, drainCfg *drain3.Config, stateDir string) *GenericParser { repo := patterns.GetInstance() var svcExt, commonExt []patterns.CompiledExtractor @@ -28,12 +35,36 @@ func NewGenericParser(serviceName, hostname string) *GenericParser { slog.Error("CRITICAL: Pattern Repository is nil. Parser will not work correctly.") } - return &GenericParser{ + parser := &GenericParser{ ServiceName: serviceName, Hostname: hostname, Extractors: svcExt, CommonExt: commonExt, } + if drainCfg != nil && stateDir != "" { + parser.drainEnabled = true + + parser.drainStatePath = filepath.Join(stateDir, serviceName+".bin") + + if err := os.MkdirAll(stateDir, 0755); err != nil { + slog.Error("Failed to create drain3 state dir", "error", err) + parser.drainEnabled = false + return parser + } + + persister := drain3.NewFilePersistence(parser.drainStatePath, false) + + state, err := persister.LoadState() + if err == nil && state != nil { + parser.drainMiner = drain3.NewTemplateMiner(drainCfg, persister) + slog.Info("Drain3 state loaded", "service", serviceName, "clusters", len(state.Clusters)) + } else { + parser.drainMiner = drain3.NewTemplateMiner(drainCfg, persister) + slog.Info("Drain3 initialized fresh", "service", serviceName) + } + } + + return parser } func (p *GenericParser) Parse(line string) (models.LogMessage, error) { @@ -51,6 +82,15 @@ func (p *GenericParser) Parse(line string) (models.LogMessage, error) { return entry, nil } + if p.drainEnabled && p.drainMiner != nil { + cluster := p.drainMiner.AddLogMessage(trimmedLine) + if cluster != nil { + entry.Fields["drain_template_id"] = cluster.ClusterID + entry.Fields["drain_template"] = cluster.TemplateMined + // Optional: Parameter extrahieren, die Drain gefunden hat (Wildcards) + } + } + allExtractors := append(p.CommonExt, p.Extractors...) matchedAny := false @@ -92,6 +132,17 @@ func (p *GenericParser) Parse(line string) (models.LogMessage, error) { return entry, nil } +func (p *GenericParser) Close() error { + if p.drainEnabled && p.drainMiner != nil { + if err := p.drainMiner.SaveState("shutdown"); err != nil { + slog.Error("Failed to save drain3 state", "service", p.ServiceName, "error", err) + return err + } + slog.Debug("Drain3 state saved", "service", p.ServiceName) + } + return nil +} + func (p *GenericParser) safeConvert(value, typeDef string) any { if value == "" || value == "-" { if strings.HasPrefix(typeDef, "int") || strings.HasPrefix(typeDef, "float") { diff --git a/parser/generic_parser_test.go b/parser/generic_parser_test.go new file mode 100644 index 0000000..13cfbac --- /dev/null +++ b/parser/generic_parser_test.go @@ -0,0 +1,198 @@ +package parser + +import ( + "log/slog" + "os" + "path/filepath" + "testing" + "watch-tool/patterns" + + "codeberg.org/pata1704/drain3" +) + +func setupPatterns(t *testing.T) { + content := ` +patterns: + common: + extractors: + - name: "syslog_header" + regex: '^\w{3} \d{2} \d{2}:\d{2}:\d{2} (?P\S+) .*' + fields: + hostname: "string" + + test_service: + extractors: + - name: "data_line" + regex: 'Data: id=(?P\d+) size=(?P[0-9.]+) active=(?Ptrue|false) empty=(?P\S+)' + fields: + id: "int" + size_mb: "float" + is_active: "bool" + empty_val: "int" # Testet Fallback bei "-" +` + tmpfile, err := os.CreateTemp("", "patterns_parser_test_*.yaml") + if err != nil { + t.Fatal(err) + } + defer os.Remove(tmpfile.Name()) + + if _, err := tmpfile.Write([]byte(content)); err != nil { + t.Fatal(err) + } + tmpfile.Close() + + if err := patterns.GetInstance().Load(tmpfile.Name()); err != nil { + t.Fatal(err) + } +} + +func TestGenericParser_Parse_Regex(t *testing.T) { + setupPatterns(t) + + p := NewGenericParser("test_service", "localhost", nil, "") + + line := "Sep 28 10:00:00 myhost Data: id=42 size=12.5 active=true empty=-" + entry, err := p.Parse(line) + if err != nil { + t.Fatalf("Parse failed: %v", err) + } + + if entry.Host != "myhost" { + t.Errorf("Expected host 'myhost', got '%s'", entry.Host) + } + + if val, ok := entry.Fields["id"].(int); !ok || val != 42 { + t.Errorf("Expected id=42 (int), got %v (%T)", entry.Fields["id"], entry.Fields["id"]) + } + + if val, ok := entry.Fields["size_mb"].(float64); !ok || val != 12.5 { + t.Errorf("Expected size_mb=12.5 (float), got %v", entry.Fields["size_mb"]) + } + + if val, ok := entry.Fields["is_active"].(bool); !ok || val != true { + t.Errorf("Expected is_active=true, got %v", entry.Fields["is_active"]) + } + + if val, ok := entry.Fields["empty_val"].(int); !ok || val != 0 { + t.Errorf("Expected empty_val=0 for '-', got %v", entry.Fields["empty_val"]) + } +} + +func TestGenericParser_Drain3_Integration(t *testing.T) { + setupPatterns(t) + + opts := &slog.HandlerOptions{Level: slog.LevelDebug} + logger := slog.New(slog.NewTextHandler(os.Stdout, opts)) + slog.SetDefault(logger) + + tmpDir, err := os.MkdirTemp("", "drain_state_test") + if err != nil { + t.Fatal(err) + } + defer os.RemoveAll(tmpDir) + + drainCfg := &drain3.Config{ + Depth: 4, + SimTh: 0.5, + MaxChildren: 100, + MaxClusters: 100, + } + + serviceName := "test_service" + p := NewGenericParser(serviceName, "localhost", drainCfg, tmpDir) + + log1 := "User admin logged in from 192.168.1.1" + log2 := "User guest logged in from 10.0.0.1" + + entry1, _ := p.Parse(log1) + if entry1.Fields["drain_template_id"] == nil { + t.Error("Drain3 did not assign a template ID for log1") + } + + entry2, _ := p.Parse(log2) + + id1 := entry1.Fields["drain_template_id"] + id2 := entry2.Fields["drain_template_id"] + t.Logf("IDs: %v -> %v", id1, id2) + t.Logf("Template 2: %s", entry2.Fields["drain_template"]) + + if err := p.Close(); err != nil { + t.Fatalf("Close failed: %v", err) + } + + expectedFile := filepath.Join(tmpDir, serviceName+".bin") + + if info, err := os.Stat(expectedFile); os.IsNotExist(err) { + t.Errorf("Drain3 state file NOT found at: %s", expectedFile) + + entries, _ := os.ReadDir(tmpDir) + t.Logf("Listing directory %s:", tmpDir) + for _, e := range entries { + t.Logf(" - Found file: %s", e.Name()) + } + } else { + t.Logf("Success: State file found (%d bytes)", info.Size()) + } +} + +func TestGenericParser_Robustness(t *testing.T) { + setupPatterns(t) + + p := NewGenericParser("test_service", "localhost", nil, "") + + tests := []struct { + name string + log string + checkField string + expectedValue any + shouldFail bool + }{ + { + name: "Empty Line", + log: "", + checkField: "", + expectedValue: nil, + shouldFail: false, + }, + { + name: "Type Mismatch Int (Text instead of Int)", + log: "Data: id=abc size=12.5 active=true empty=-", + checkField: "_parse_status", + expectedValue: "failed", + }, + { + name: "Value Missing (Dash) for Int", + log: "Data: id=1 size=1.0 active=true empty=-", + checkField: "empty_val", + expectedValue: 0, + }, + { + name: "Value Missing (Dash) for Float", + log: "Data: id=1 size=1.0 active=true empty=0", + checkField: "size_mb", + expectedValue: 1.0, + }, + } + + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + entry, err := p.Parse(tc.log) + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + + if tc.checkField != "" { + val, exists := entry.Fields[tc.checkField] + if tc.expectedValue == "failed" { + if !exists || val != "failed" { + t.Errorf("Expected parse failure status, got %v", val) + } + } else { + if val != tc.expectedValue { + t.Errorf("Field %s: expected %v, got %v", tc.checkField, tc.expectedValue, val) + } + } + } + }) + } +} diff --git a/parser/json_parser.go b/parser/json_parser.go index d83d068..ce270bf 100644 --- a/parser/json_parser.go +++ b/parser/json_parser.go @@ -17,3 +17,7 @@ func (j *JSONParser) Parse(line string) (models.LogMessage, error) { } return logMsg, nil } + +func (p *JSONParser) Close() error { + return nil +} diff --git a/parser/parser.go b/parser/parser.go index 53d3c63..52c01ad 100644 --- a/parser/parser.go +++ b/parser/parser.go @@ -6,4 +6,5 @@ import ( type Parser interface { Parse(line string) (models.LogMessage, error) + Close() error } diff --git a/patterns/repository.go b/patterns/repository.go index f8b8fd4..e814d91 100644 --- a/patterns/repository.go +++ b/patterns/repository.go @@ -1,93 +1,3 @@ -// package patterns - -// import ( -// "fmt" -// "regexp" -// "sync" - -// "gopkg.in/yaml.v3" -// "os" -// ) - -// type PatternConfig struct { -// Patterns map[string]map[string]PatternDefinition `yaml:"patterns"` -// } - -// type PatternDefinition struct { -// Regex string `yaml:"regex"` -// Description string `yaml:"description,omitempty"` -// } - -// type Repository struct { -// compiledPatterns map[string]map[string]*regexp.Regexp -// mu sync.RWMutex -// } - -// var ( -// instance *Repository -// once sync.Once -// ) - -// func GetInstance() *Repository { -// once.Do(func() { -// instance = &Repository{ -// compiledPatterns: make(map[string]map[string]*regexp.Regexp), -// } -// }) -// return instance -// } - -// func (r *Repository) Load(path string) error { -// r.mu.Lock() -// defer r.mu.Unlock() - -// data, err := os.ReadFile(path) -// if err != nil { -// return fmt.Errorf("failed to read pattern config: %w", err) -// } - -// var config PatternConfig -// if err := yaml.Unmarshal(data, &config); err != nil { -// return fmt.Errorf("failed to parse pattern config: %w", err) -// } - -// for service, patterns := range config.Patterns { -// if _, exists := r.compiledPatterns[service]; !exists { -// r.compiledPatterns[service] = make(map[string]*regexp.Regexp) -// } - -// for name, def := range patterns { -// compiled, err := regexp.Compile(def.Regex) -// if err != nil { -// return fmt.Errorf("invalid regex for %s/%s: %w", service, name, err) -// } -// r.compiledPatterns[service][name] = compiled -// } -// } - -// return nil -// } - -// func (r *Repository) Get(service string, name string) (*regexp.Regexp, error) { -// r.mu.RLock() -// defer r.mu.RUnlock() - -// if svcPatterns, ok := r.compiledPatterns[service]; ok { -// if pattern, ok := svcPatterns[name]; ok { -// return pattern, nil -// } -// } - -// return nil, fmt.Errorf("pattern not found: %s/%s", service, name) -// } - -// func (r *Repository) MustGet(service string, name string) *regexp.Regexp { -// p, err := r.Get(service, name) -// if err != nil { -// panic(err) -// } -// return p -// } package patterns import ( @@ -99,7 +9,6 @@ import ( "gopkg.in/yaml.v3" ) -// Struktur der YAML Datei type Config struct { Patterns map[string]ServiceConfig `yaml:"patterns"` } @@ -111,10 +20,9 @@ type ServiceConfig struct { type ExtractorConfig struct { Name string `yaml:"name"` Regex string `yaml:"regex"` - Fields map[string]string `yaml:"fields"` // Name -> Typ (int, float, string) + Fields map[string]string `yaml:"fields"` } -// Interne kompilierte Struktur type CompiledExtractor struct { Name string Pattern *regexp.Regexp diff --git a/patterns/repository_test.go b/patterns/repository_test.go new file mode 100644 index 0000000..2643483 --- /dev/null +++ b/patterns/repository_test.go @@ -0,0 +1,51 @@ +package patterns + +import ( + "os" + "testing" +) + +func TestRepository_Load(t *testing.T) { + content := ` +patterns: + test_service: + extractors: + - name: "test_pattern" + regex: '^Test (?P\d+) (?P\d+\.\d+)$' + fields: + id: "int" + value: "float" +` + tmpfile, err := os.CreateTemp("", "patterns_test_*.yaml") + if err != nil { + t.Fatal(err) + } + defer os.Remove(tmpfile.Name()) + + if _, err := tmpfile.Write([]byte(content)); err != nil { + t.Fatal(err) + } + if err := tmpfile.Close(); err != nil { + t.Fatal(err) + } + + repo := GetInstance() + err = repo.Load(tmpfile.Name()) + if err != nil { + t.Fatalf("Failed to load repository: %v", err) + } + + extractors := repo.GetExtractors("test_service") + if len(extractors) != 1 { + t.Errorf("Expected 1 extractor, got %d", len(extractors)) + } + + ext := extractors[0] + if ext.Name != "test_pattern" { + t.Errorf("Expected name 'test_pattern', got '%s'", ext.Name) + } + + if !ext.Pattern.MatchString("Test 123 45.67") { + t.Error("Regex did not match valid string") + } +} diff --git a/service_monitor.go b/service_monitor.go index 2320130..9eb1118 100644 --- a/service_monitor.go +++ b/service_monitor.go @@ -2,12 +2,12 @@ package main import ( "bufio" + "codeberg.org/pata1704/drain3" "context" "encoding/json" "fmt" "log/slog" "os/exec" - "regexp" "strconv" "strings" "time" @@ -16,14 +16,18 @@ import ( ) type ServiceMonitor struct { - config ServiceConfig - hostname string + config ServiceConfig + hostname string + drainConfig *drain3.Config + stateDir string } -func NewServiceMonitor(config ServiceConfig, hostname string) *ServiceMonitor { +func NewServiceMonitor(config ServiceConfig, hostname string, drainCfg *drain3.Config, stateDir string) *ServiceMonitor { return &ServiceMonitor{ - config: config, - hostname: hostname, + config: config, + hostname: hostname, + drainConfig: drainCfg, + stateDir: stateDir, } } @@ -52,7 +56,8 @@ func (sm *ServiceMonitor) Start(ctx context.Context, out chan<- models.LogMessag } }() - parser := NewJournalEntryParser(sm.config.Name, sm.config.Service, sm.hostname) + jParser := NewJournalEntryParser(sm.config.Name, sm.config.Service, sm.hostname, sm.drainConfig, sm.stateDir) + defer jParser.Close() for scanner.Scan() { select { @@ -66,7 +71,7 @@ func (sm *ServiceMonitor) Start(ctx context.Context, out chan<- models.LogMessag continue } - entry, err := parser.Parse(line) + entry, err := jParser.Parse(line) if err != nil { slog.Error("error parsing journal entry", "service", sm.config.Name, "error", err) continue @@ -112,16 +117,38 @@ type JournalEntryParser struct { serviceName string unitName string hostname string + innerParser parser.Parser } -func NewJournalEntryParser(serviceName, unitName, hostname string) *JournalEntryParser { +func NewJournalEntryParser(serviceName, unitName, hostname string, drainCfg *drain3.Config, stateDir string) *JournalEntryParser { + pCfg := parser.ParserConfig{ + ServiceName: serviceName, + LogType: "custom", + Hostname: hostname, + DrainConfig: drainCfg, + StateDir: stateDir, + } + + inner, err := parser.New(pCfg) + if err != nil { + slog.Error("Failed to create inner parser for service", "service", serviceName, "error", err) + } + return &JournalEntryParser{ serviceName: serviceName, unitName: unitName, hostname: hostname, + innerParser: inner, } } +func (jep *JournalEntryParser) Close() error { + if jep.innerParser != nil { + return jep.innerParser.Close() + } + return nil +} + func (jep *JournalEntryParser) Parse(jsonLine string) (models.LogMessage, error) { var journalData map[string]any if err := json.Unmarshal([]byte(jsonLine), &journalData); err != nil { @@ -170,11 +197,25 @@ func (jep *JournalEntryParser) Parse(jsonLine string) (models.LogMessage, error) entry.Raw = jsonLine - entry = jep.parseServiceSpecific(entry) + if jep.innerParser != nil && entry.LogMessage != "" { + parsedMsg, err := jep.innerParser.Parse(entry.LogMessage) + if err == nil { + jep.mergeEntries(&entry, &parsedMsg) + } + } return entry, nil } +func (jep *JournalEntryParser) mergeEntries(target *models.LogMessage, source *models.LogMessage) { + for k, v := range source.Fields { + target.Fields[k] = v + } + if source.LogLevel != "" { + target.LogLevel = source.LogLevel + } +} + func (jep *JournalEntryParser) getPriorityName(priority string) string { priorityNames := map[string]string{ "0": "emergency", @@ -213,29 +254,3 @@ func (jep *JournalEntryParser) extractSystemdFields(journalData map[string]any, } } } - -func (jep *JournalEntryParser) parseServiceSpecific(entry models.LogMessage) models.LogMessage { - logParser, err := parser.New(jep.serviceName, "custom", jep.hostname) - if err != nil { - slog.Error("cannot get service specific parser") - return entry - } - entry, err = logParser.Parse(entry.LogMessage) - return entry -} - -var ( - amServicePattern = regexp.MustCompile(`^(\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}(?:\.\d+)?Z)\s+(\w+)\s+(\d+)\s+---\s+\[\s*([^\]]*)\]\s+([\w\.]+)\s*:\s*(.*)$`) - tccServicePattern = regexp.MustCompile(`^(\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}(?:\.\d+)?Z)\s+(\w+)\s+(\d+)\s+---\s+\[\s*([^\]]*)\]\s+([\w\.]+)\s*:\s*(.*)$`) - tjmServicePattern = regexp.MustCompile(`^(?