From 491eeaabd744d4fe3c3752efe121b7fa4358bade Mon Sep 17 00:00:00 2001 From: Patryk Hegenberg Date: Tue, 23 Sep 2025 08:36:46 +0200 Subject: [PATCH] refactor: make codebase more modular and use a single index for all services, tools and metics --- elasticsearch.go | 25 ++++--- file_monitor.go | 2 +- go.mod | 12 ++- go.sum | 49 +++++++++++- local_storage.go | 180 +++++++++++++++++++++++++++++++++++++++++++++ main.go | 3 +- models.go | 162 ++++++++++++++++------------------------ service_monitor.go | 40 ++++++---- 8 files changed, 343 insertions(+), 130 deletions(-) create mode 100644 local_storage.go diff --git a/elasticsearch.go b/elasticsearch.go index f2dfae6..8ecc079 100644 --- a/elasticsearch.go +++ b/elasticsearch.go @@ -66,7 +66,8 @@ func (esc *ElasticsearchClient) SendBatch(baseIndex string, entries []LogEntry) var body strings.Builder for _, entry := range entries { - indexName := determineIndexName(baseIndex, entry) + // indexName := determineIndexName(baseIndex, entry) + indexName := "tixel" indexLine := fmt.Sprintf(`{"index":{"_index":"%s"}}`, indexName) body.WriteString(indexLine) @@ -102,7 +103,7 @@ func (esc *ElasticsearchClient) SendBatch(baseIndex string, entries []LogEntry) } func (esc *ElasticsearchClient) SendSystemMetrics(baseIndex string, metrics SystemResources) error { - msg := LogMessage{ + msg := LogEntry{ Service: "system-metrics", Timestamp: time.Now(), LogLevel: "Info", @@ -142,13 +143,13 @@ func (esc *ElasticsearchClient) SendSystemMetrics(baseIndex string, metrics Syst return nil } -func determineIndexName(baseIndex string, entry LogEntry) string { - switch entry.Type { - case "system_metrics": - return fmt.Sprintf("%s-system", baseIndex) - case "service_log": - return fmt.Sprintf("%s-service-%s", baseIndex, entry.Service) - default: - return fmt.Sprintf("%s-%s", baseIndex, entry.Tool) - } -} +// func determineIndexName(baseIndex string, entry LogEntry) string { +// switch entry.Type { +// case "system_metrics": +// return fmt.Sprintf("%s-system", baseIndex) +// case "service_log": +// return fmt.Sprintf("%s-service-%s", baseIndex, entry.Service) +// default: +// return fmt.Sprintf("%s-%s", baseIndex, entry.Tool) +// } +// } diff --git a/file_monitor.go b/file_monitor.go index b84ab9c..2f18a13 100644 --- a/file_monitor.go +++ b/file_monitor.go @@ -205,7 +205,7 @@ func (p *NginxTJMLogParser) parseNginxTJM(entry LogEntry) LogEntry { break } } - newEntry.BaseInformation = nginxBase + newEntry.ServiceInformation = nginxBase return newEntry } diff --git a/go.mod b/go.mod index fb0d12c..fdc6247 100644 --- a/go.mod +++ b/go.mod @@ -7,17 +7,23 @@ require ( github.com/hpcloud/tail v1.0.0 github.com/shirou/gopsutil v3.21.11+incompatible github.com/spf13/viper v1.20.1 - golang.org/x/sys v0.31.0 + golang.org/x/sys v0.34.0 + modernc.org/sqlite v1.39.0 ) require ( + github.com/dustin/go-humanize v1.0.1 // indirect github.com/elastic/elastic-transport-go/v8 v8.7.0 // indirect github.com/fsnotify/fsnotify v1.8.0 // indirect github.com/go-logr/logr v1.4.2 // indirect github.com/go-logr/stdr v1.2.2 // indirect github.com/go-ole/go-ole v1.2.6 // indirect github.com/go-viper/mapstructure/v2 v2.2.1 // indirect + github.com/google/uuid v1.6.0 // indirect + github.com/mattn/go-isatty v0.0.20 // indirect + github.com/ncruces/go-strftime v0.1.9 // indirect github.com/pelletier/go-toml/v2 v2.2.3 // indirect + github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec // indirect github.com/sagikazarmark/locafero v0.7.0 // indirect github.com/sourcegraph/conc v0.3.0 // indirect github.com/spf13/afero v1.12.0 // indirect @@ -32,8 +38,12 @@ require ( go.opentelemetry.io/otel/trace v1.29.0 // indirect go.uber.org/atomic v1.9.0 // indirect go.uber.org/multierr v1.9.0 // indirect + golang.org/x/exp v0.0.0-20250620022241-b7579e27df2b // indirect golang.org/x/text v0.21.0 // indirect gopkg.in/fsnotify.v1 v1.4.7 // indirect gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7 // indirect gopkg.in/yaml.v3 v3.0.1 // indirect + modernc.org/libc v1.66.3 // indirect + modernc.org/mathutil v1.7.1 // indirect + modernc.org/memory v1.11.0 // indirect ) diff --git a/go.sum b/go.sum index 9aa28be..01d3bef 100644 --- a/go.sum +++ b/go.sum @@ -1,6 +1,8 @@ github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/dustin/go-humanize v1.0.1 h1:GzkhY7T5VNhEkwH0PVJgjz+fX1rhBrR7pRT3mDkpeCY= +github.com/dustin/go-humanize v1.0.1/go.mod h1:Mu1zIs6XwVuF/gI1OepvI0qD18qycQx+mFykh5fBlto= github.com/elastic/elastic-transport-go/v8 v8.7.0 h1:OgTneVuXP2uip4BA658Xi6Hfw+PeIOod2rY3GVMGoVE= github.com/elastic/elastic-transport-go/v8 v8.7.0/go.mod h1:YLHer5cj0csTzNFXoNQ8qhtGY1GTvSqPnKWKaqQE3Hk= github.com/elastic/go-elasticsearch/v8 v8.19.0 h1:VmfBLNRORY7RZL+9hTxBD97ehl9H8Nxf2QigDh6HuMU= @@ -20,6 +22,8 @@ github.com/go-viper/mapstructure/v2 v2.2.1 h1:ZAaOCxANMuZx5RCeg0mBdEZk7DZasvvZIx github.com/go-viper/mapstructure/v2 v2.2.1/go.mod h1:oJDH3BJKyqBA2TXFhDsKDGDTlndYOZ6rGS0BRZIxGhM= github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI= github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= +github.com/google/pprof v0.0.0-20250317173921-a4b03ec1a45e h1:ijClszYn+mADRFY17kjQEVQ1XRhq2/JR1M3sGqeJoxs= +github.com/google/pprof v0.0.0-20250317173921-a4b03ec1a45e/go.mod h1:boTsfXsheKC2y+lKOCMpSfarhxDeIzfZG1jqGcPl3cA= github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/hpcloud/tail v1.0.0 h1:nfCOvKYfkgYP8hkirhJocXT2+zOD8yUNjXaWfTlyFKI= @@ -28,10 +32,16 @@ github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE= github.com/kr/pretty v0.3.1/go.mod h1:hoEshYVHaxMs3cyo3Yncou5ZscifuDolrwPKZanG3xk= github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE= +github.com/mattn/go-isatty v0.0.20 h1:xfD0iDuEKnDkl03q4limB+vH+GxLEtL/jb4xVJSWWEY= +github.com/mattn/go-isatty v0.0.20/go.mod h1:W+V8PltTTMOvKvAeJH7IuucS94S2C6jfK/D7dTCTo3Y= +github.com/ncruces/go-strftime v0.1.9 h1:bY0MQC28UADQmHmaF5dgpLmImcShSi2kHU9XLdhx/f4= +github.com/ncruces/go-strftime v0.1.9/go.mod h1:Fwc5htZGVVkseilnfgOVb9mKy6w1naJmn9CehxcKcls= github.com/pelletier/go-toml/v2 v2.2.3 h1:YmeHyLY8mFWbdkNWwpr+qIL2bEqT0o95WSdkNHvL12M= github.com/pelletier/go-toml/v2 v2.2.3/go.mod h1:MfCQTFTvCcUyyvvwm1+G6H/jORL20Xlb6rzQu9GuUkc= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec h1:W09IVJc94icq4NjY3clb7Lk8O1qJ8BdBEF8z0ibU0rE= +github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec/go.mod h1:qqbHyh8v60DhA7CoWK5oRCqLrMHRGoxYCSS9EjAz6Eo= github.com/rogpeppe/go-internal v1.9.0 h1:73kH8U+JUqXU8lRuOHeVHaa/SZPifC7BkcraZVejAe8= github.com/rogpeppe/go-internal v1.9.0/go.mod h1:WtVeX8xhTBvf0smdhujwtBcq4Qrzq/fJaraNFVN+nFs= github.com/sagikazarmark/locafero v0.7.0 h1:5MqpDsTGNDhY8sGp0Aowyf0qKsPrhewaLSsFaodPcyo= @@ -72,11 +82,20 @@ go.uber.org/atomic v1.9.0 h1:ECmE8Bn/WFTYwEW/bpKD3M8VtR/zQVbavAoalC1PYyE= go.uber.org/atomic v1.9.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc= go.uber.org/multierr v1.9.0 h1:7fIwc/ZtS0q++VgcfqFDxSBZVv/Xo49/SYnDFupUwlI= go.uber.org/multierr v1.9.0/go.mod h1:X2jQV1h+kxSjClGpnseKVIxpmcjrj7MNnI0bnlfKTVQ= +golang.org/x/exp v0.0.0-20250620022241-b7579e27df2b h1:M2rDM6z3Fhozi9O7NWsxAkg/yqS/lQJ6PmkyIV3YP+o= +golang.org/x/exp v0.0.0-20250620022241-b7579e27df2b/go.mod h1:3//PLf8L/X+8b4vuAfHzxeRUl04Adcb341+IGKfnqS8= +golang.org/x/mod v0.25.0 h1:n7a+ZbQKQA/Ysbyb0/6IbB1H/X41mKgbhfv7AfG/44w= +golang.org/x/mod v0.25.0/go.mod h1:IXM97Txy2VM4PJ3gI61r1YEk/gAj6zAHN3AdZt6S9Ww= +golang.org/x/sync v0.15.0 h1:KWH3jNZsfyT6xfAfKiz6MRNmd46ByHDYaZ7KSkCtdW8= +golang.org/x/sync v0.15.0/go.mod h1:1dzgHSNfp02xaA81J2MS99Qcpr2w7fw1gpm99rleRqA= golang.org/x/sys v0.0.0-20190916202348-b4ddaad3f8a3/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= -golang.org/x/sys v0.31.0 h1:ioabZlmFYtWhL+TRYpcnNlLwhyxaM9kWTDEmfnprqik= -golang.org/x/sys v0.31.0/go.mod h1:BJP2sWEmIv4KK5OTEluFJCKSidICx8ciO85XgH3Ak8k= +golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.34.0 h1:H5Y5sJ2L2JRdyv7ROF1he/lPdvFsd0mJHFw2ThKHxLA= +golang.org/x/sys v0.34.0/go.mod h1:BJP2sWEmIv4KK5OTEluFJCKSidICx8ciO85XgH3Ak8k= golang.org/x/text v0.21.0 h1:zyQAAkrwaneQ066sspRyJaG9VNi/YJ1NfzcGB3hZ/qo= golang.org/x/text v0.21.0/go.mod h1:4IBbMaMmOPCJ8SecivzSH54+73PCFmPWxNTLm+vZkEQ= +golang.org/x/tools v0.34.0 h1:qIpSLOxeCYGg9TrcJokLBG4KFA6d795g0xkBkiESGlo= +golang.org/x/tools v0.34.0/go.mod h1:pAP9OwEaY1CAW3HOmg3hLZC5Z0CCmzjAF2UQMSqNARg= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15 h1:YR8cESwS4TdDjEe65xsg0ogRM/Nc3DYOhEAlW+xobZo= gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= @@ -86,3 +105,29 @@ gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7 h1:uRGJdciOHaEIrze2W8Q3AKkep gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7/go.mod h1:dt/ZhP58zS4L8KSrWDmTeBkI65Dw0HsyUHuEVlX15mw= gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= +modernc.org/cc/v4 v4.26.2 h1:991HMkLjJzYBIfha6ECZdjrIYz2/1ayr+FL8GN+CNzM= +modernc.org/cc/v4 v4.26.2/go.mod h1:uVtb5OGqUKpoLWhqwNQo/8LwvoiEBLvZXIQ/SmO6mL0= +modernc.org/ccgo/v4 v4.28.0 h1:rjznn6WWehKq7dG4JtLRKxb52Ecv8OUGah8+Z/SfpNU= +modernc.org/ccgo/v4 v4.28.0/go.mod h1:JygV3+9AV6SmPhDasu4JgquwU81XAKLd3OKTUDNOiKE= +modernc.org/fileutil v1.3.8 h1:qtzNm7ED75pd1C7WgAGcK4edm4fvhtBsEiI/0NQ54YM= +modernc.org/fileutil v1.3.8/go.mod h1:HxmghZSZVAz/LXcMNwZPA/DRrQZEVP9VX0V4LQGQFOc= +modernc.org/gc/v2 v2.6.5 h1:nyqdV8q46KvTpZlsw66kWqwXRHdjIlJOhG6kxiV/9xI= +modernc.org/gc/v2 v2.6.5/go.mod h1:YgIahr1ypgfe7chRuJi2gD7DBQiKSLMPgBQe9oIiito= +modernc.org/goabi0 v0.2.0 h1:HvEowk7LxcPd0eq6mVOAEMai46V+i7Jrj13t4AzuNks= +modernc.org/goabi0 v0.2.0/go.mod h1:CEFRnnJhKvWT1c1JTI3Avm+tgOWbkOu5oPA8eH8LnMI= +modernc.org/libc v1.66.3 h1:cfCbjTUcdsKyyZZfEUKfoHcP3S0Wkvz3jgSzByEWVCQ= +modernc.org/libc v1.66.3/go.mod h1:XD9zO8kt59cANKvHPXpx7yS2ELPheAey0vjIuZOhOU8= +modernc.org/mathutil v1.7.1 h1:GCZVGXdaN8gTqB1Mf/usp1Y/hSqgI2vAGGP4jZMCxOU= +modernc.org/mathutil v1.7.1/go.mod h1:4p5IwJITfppl0G4sUEDtCr4DthTaT47/N3aT6MhfgJg= +modernc.org/memory v1.11.0 h1:o4QC8aMQzmcwCK3t3Ux/ZHmwFPzE6hf2Y5LbkRs+hbI= +modernc.org/memory v1.11.0/go.mod h1:/JP4VbVC+K5sU2wZi9bHoq2MAkCnrt2r98UGeSK7Mjw= +modernc.org/opt v0.1.4 h1:2kNGMRiUjrp4LcaPuLY2PzUfqM/w9N23quVwhKt5Qm8= +modernc.org/opt v0.1.4/go.mod h1:03fq9lsNfvkYSfxrfUhZCWPk1lm4cq4N+Bh//bEtgns= +modernc.org/sortutil v1.2.1 h1:+xyoGf15mM3NMlPDnFqrteY07klSFxLElE2PVuWIJ7w= +modernc.org/sortutil v1.2.1/go.mod h1:7ZI3a3REbai7gzCLcotuw9AC4VZVpYMjDzETGsSMqJE= +modernc.org/sqlite v1.39.0 h1:6bwu9Ooim0yVYA7IZn9demiQk/Ejp0BtTjBWFLymSeY= +modernc.org/sqlite v1.39.0/go.mod h1:cPTJYSlgg3Sfg046yBShXENNtPrWrDX8bsbAQBzgQ5E= +modernc.org/strutil v1.2.1 h1:UneZBkQA+DX2Rp35KcM69cSsNES9ly8mQWD71HKlOA0= +modernc.org/strutil v1.2.1/go.mod h1:EHkiggD70koQxjVdSBM3JKM7k6L0FbGE5eymy9i3B9A= +modernc.org/token v1.1.0 h1:Xl7Ap9dKaEs5kLoOQeQmPWevfnk/DM5qcLcYlA8ys6Y= +modernc.org/token v1.1.0/go.mod h1:UGzOrNV1mAFSEB63lOFHIpNRUVMvYTc6yu1SMY/XTDM= diff --git a/local_storage.go b/local_storage.go new file mode 100644 index 0000000..e3f84e6 --- /dev/null +++ b/local_storage.go @@ -0,0 +1,180 @@ +package main + +import ( + "context" + "database/sql" + "encoding/json" + + _ "modernc.org/sqlite" +) + +type StorageService struct { + db *sql.DB +} + +func NewStorageService(dbPath string) (*StorageService, error) { + db, err := sql.Open("sqlite", dbPath) + if err != nil { + return nil, err + } + + createTableStmt := ` + CREATE TABLE IF NOT EXISTS log_entries ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + service TEXT, + timestamp DATETIME, + type TEXT, + host TEXT, + tool TEXT, + log_level TEXT, + log_message TEXT, + raw TEXT, + priority TEXT, + priority_name TEXT, + unit TEXT, + pid INTEGER, + boot_id TEXT, + machine_id TEXT, + fields TEXT, + service_information TEXT, + system_metrics TEXT, + tool_information TEXT + ); + ` + _, err = db.ExecContext(context.Background(), createTableStmt) + if err != nil { + return nil, err + } + + return &StorageService{db: db}, nil +} + +func (s *StorageService) Close() error { + return s.db.Close() +} + +func (s *StorageService) SaveLogEntry(ctx context.Context, entry *LogEntry) error { + fieldsJSON := "" + if entry.Fields != nil { + b, err := json.Marshal(entry.Fields) + if err != nil { + return err + } + fieldsJSON = string(b) + } + + serviceInfoJSON := "" + if entry.ServiceInformation != nil { + b, err := json.Marshal(entry.ServiceInformation) + if err != nil { + return err + } + serviceInfoJSON = string(b) + } + + systemMetricsJSON := "" + if entry.SystemMetrics != nil { + b, err := json.Marshal(entry.SystemMetrics) + if err != nil { + return err + } + systemMetricsJSON = string(b) + } + + toolInfoJSON := "" + if entry.ToolInformation != nil { + b, err := json.Marshal(entry.ToolInformation) + if err != nil { + return err + } + toolInfoJSON = string(b) + } + + stmt := ` + INSERT INTO log_entries + (service, timestamp, type, host, tool, log_level, log_message, raw, priority, priority_name, unit, pid, boot_id, machine_id, fields, service_information, system_metrics, tool_information) + VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) + ` + _, err := s.db.ExecContext(ctx, stmt, + entry.Service, + entry.Timestamp, + entry.Type, + entry.Host, + entry.Tool, + entry.LogLevel, + entry.LogMessage, + entry.Raw, + entry.Priority, + entry.PriorityName, + entry.Unit, + entry.PID, + entry.BootID, + entry.MachineID, + fieldsJSON, + serviceInfoJSON, + systemMetricsJSON, + toolInfoJSON, + ) + return err +} + +func (s *StorageService) LoadLogEntry(ctx context.Context, id int64) (*LogEntry, error) { + row := s.db.QueryRowContext(ctx, "SELECT service, timestamp, type, host, tool, log_level, log_message, raw, priority, priority_name, unit, pid, boot_id, machine_id, fields, service_information, system_metrics, tool_information FROM log_entries WHERE id = ?", id) + + var entry LogEntry + var fieldsJSON, serviceInfoJSON, systemMetricsJSON, toolInfoJSON string + + err := row.Scan( + &entry.Service, + &entry.Timestamp, + &entry.Type, + &entry.Host, + &entry.Tool, + &entry.LogLevel, + &entry.LogMessage, + &entry.Raw, + &entry.Priority, + &entry.PriorityName, + &entry.Unit, + &entry.PID, + &entry.BootID, + &entry.MachineID, + &fieldsJSON, + &serviceInfoJSON, + &systemMetricsJSON, + &toolInfoJSON, + ) + if err != nil { + return nil, err + } + + if fieldsJSON != "" { + var fields map[string]any + if err = json.Unmarshal([]byte(fieldsJSON), &fields); err == nil { + entry.Fields = fields + } + } + + if serviceInfoJSON != "" { + var si any + if err = json.Unmarshal([]byte(serviceInfoJSON), &si); err == nil { + entry.ServiceInformation = si + } + } + + if systemMetricsJSON != "" { + var sm any + if err = json.Unmarshal([]byte(systemMetricsJSON), &sm); err == nil { + entry.SystemMetrics = sm + } + } + + if toolInfoJSON != "" { + var ti any + if err = json.Unmarshal([]byte(toolInfoJSON), &ti); err == nil { + entry.ToolInformation = ti + } + } + + return &entry, nil +} diff --git a/main.go b/main.go index 14267cb..f6f29a4 100644 --- a/main.go +++ b/main.go @@ -26,6 +26,7 @@ func main() { slog.Error("error loading configuration", "error", err) os.Exit(1) } + slog.Info("TIXEL System Monitor started") es, err := NewElasticsearchClient(cfg.Elasticsearch) if err != nil { @@ -38,7 +39,7 @@ func main() { os.Exit(1) } - slog.Info("TIXEL System Monitor started - Elasticsearch connection successful") + slog.Info("Elasticsearch connection successful") logChan := make(chan LogEntry, 1000) ctx, cancel := context.WithCancel(context.Background()) diff --git a/models.go b/models.go index 27677e3..f5e81e0 100644 --- a/models.go +++ b/models.go @@ -113,43 +113,30 @@ type NetworkStat struct { } type LogEntry struct { - Service string `json:"service,omitempty"` - Timestamp time.Time `json:"timestamp"` - Type string `json:"type"` - Host string `json:"host"` - Tool string `json:"tool,omitempty"` - LogLevel string `json:"log_level"` - LogMessage string `json:"message,omitempty"` - SyslogInfo SyslogFields `json:"syslog_information,omitempty"` - BaseInformation any `json:"base_info"` - ServiceInformation any `json:"service_info"` - SystemMetrics SystemResources `json:"system-metrics"` - ToolInformation any `json:"tool_info"` - Raw string `json:"raw,omitempty"` - Priority string `json:"priority,omitempty"` - PriorityName string `json:"priority_name,omitempty"` - Unit string `json:"unit,omitempty"` - PID int `json:"pid,omitempty"` - BootID string `json:"boot_id,omitempty"` - MachineID string `json:"machine_id,omitempty"` - Fields map[string]any `json:"fields,omitempty"` -} - -type LogMessage struct { - Service string `json:"service"` - Timestamp time.Time `json:"timestamp"` - LogLevel string `json:"log_level"` - LogMessage string `json:"log_message"` - SyslogInfo SyslogFields `json:"syslog_information"` - BaseInformation any `json:"base_info"` - ServiceInformation any `json:"service_info"` - SystemMetrics SystemResources `json:"system-metrics"` - ToolInformation any `json:"tool_info"` + Service string `json:"service,omitempty"` + Timestamp time.Time `json:"timestamp"` + Type string `json:"type"` + Host string `json:"host"` + Tool string `json:"tool,omitempty"` + LogLevel string `json:"log_level"` + LogMessage string `json:"log_message,omitempty"` + SyslogInfo SyslogFields `json:"syslog_information,omitempty"` + ServiceInformation any `json:"service_info,omitempty"` + SystemMetrics any `json:"system-metrics,omitempty"` + ToolInformation any `json:"tool_info,omitempty"` + Raw string `json:"raw,omitempty"` + Priority string `json:"priority,omitempty"` + PriorityName string `json:"priority_name,omitempty"` + Unit string `json:"unit,omitempty"` + PID int `json:"pid,omitempty"` + BootID string `json:"boot_id,omitempty"` + MachineID string `json:"machine_id,omitempty"` + Fields map[string]any `json:"fields,omitempty"` } type SyslogFields struct { - SysLogTimestamp time.Time `json:"syslog_timestamp"` - Hostname string `json:"hostname"` + SysLogTimestamp time.Time `json:"syslog_timestamp,omitempty"` + Hostname string `json:"hostname,omitempty"` ProcessInfo string `json:"process_info"` Raw string `json:"raw,omitempty"` Priority string `json:"priority,omitempty"` @@ -165,7 +152,6 @@ func NewLogEntry(entryType string) LogEntry { Timestamp: time.Now(), Type: entryType, Host: hostname, - // Fields: make(map[string]any), } } @@ -208,70 +194,52 @@ type NGinXBaseInfo struct { Route string `json:"route"` } -type TSBaseInfo struct { - TransferID string `json:"transfer_identifier"` - Lane int `json:"thread"` - Direction string `json:"direction"` - Buffers int `json:"buffers"` - FileCount int `json:"file_count"` - FileSize float64 `json:"file_size"` - ChunkSize int `json:"chunksize"` - Streams int `json:"streams"` - TargetDatarate float64 `json:"target_datarate"` - Protocoll string `json:"protocoll"` - Destination string `json:"destination"` - Sender string `json:"sender_id"` - Target string `json:"transfer_target"` - Source string `json:"source"` - Receiver string `json:"receiver_id"` - StartTime time.Time `json:"start_time"` - EndTime time.Time `json:"end_time"` -} - type TSTransferInfo struct { - TransferID string `json:"transfer_identifier"` - StartTime time.Time `json:"start_time"` - EndTime time.Time `json:"end_time"` - TransferLane string `json:"transfer_lane"` - FileCount int `json:"file_count"` - FileSizeMB float64 `json:"file_size_mb"` - DataRateMBs float64 `json:"datarate_mbs"` - Dest string `json:"destination"` - Src string `json:"source"` - SenderID string `json:"sender_id"` - Receiver string `json:"receiver"` - BytesProcessed int64 `json:"bytes_processed"` - Direction string `json:"direction"` - Duration time.Duration `json:"duration_seconds"` - TheoreticalRate float64 `json:"theoretical_rate_mbs"` - Efficiency float64 `json:"efficiency_percent"` - Status string `json:"status"` -} - -type TJMBaseInfo struct { - ProcessID string `json:"process_id"` - TransferID string `json:"transfer_identifier"` - Direction string `json:"direction"` - Username string `json:"username"` - CorrelationID string `json:"correlation_id"` - ThreadID string `json:"thread_id"` - JavaClass string `json:"java_class"` + TransferID string `json:"transfer_identifier,omitempty"` + Lane int `json:"lane,omitempty"` + StartTime time.Time `json:"start_time,omitempty"` + EndTime time.Time `json:"end_time,omitempty"` + Buffers int `json:"buffers,omitempty"` + Streams int `json:"streams,omitempty"` + ChunkSize int `json:"chunksize,omitempty"` + TransferLane string `json:"transfer_lane,omitempty"` + FileCount int `json:"file_count,omitempty"` + FileSizeMB float64 `json:"file_size_mb,omitempty"` + DataRateMBs float64 `json:"datarate_mbs,omitempty"` + TargetDatarate float64 `json:"target_datarate,omitempty"` + Protocoll string `json:"protocoll,omitempty"` + Target string `json:"transfer_target,omitempty"` + Dest string `json:"destination,omitempty"` + Src string `json:"source,omitempty"` + SenderID string `json:"sender_id,omitempty"` + Receiver string `json:"receiver,omitempty"` + BytesProcessed int64 `json:"bytes_processed,omitempty"` + Direction string `json:"direction,omitempty"` + Duration time.Duration `json:"duration_seconds,omitempty"` + TheoreticalRate float64 `json:"theoretical_rate_mbs,omitempty"` + Efficiency float64 `json:"efficiency_percent,omitempty"` + Status string `json:"status,omitempty"` } type TJMTransferInfo struct { - TransferID string `json:"transfer_identifier"` - TransferName string `json:"transfer_name"` - StartTime time.Time `json:"start_time"` - EndTime time.Time `json:"end_time"` - TransferLane string `json:"transfer_lane"` - Dest string `json:"destination"` - Src string `json:"source"` - SenderID string `json:"sender_id"` - Receiver string `json:"receiver"` - Direction string `json:"direction"` - Worker string `json:"worker"` - Duration time.Duration `json:"duration"` - DataRateMBs float64 `json:"datarate_mbs"` - BytesProcessed int64 `json:"bytes_processed"` - FileSizeMB float64 `json:"file_size_mb"` + TransferID string `json:"transfer_identifier,omitempty"` + TransferName string `json:"transfer_name,omitempty"` + ProcessID string `json:"process_id,omitempty"` + Username string `json:"username,omitempty"` + CorrelationID string `json:"correlation_id,omitempty"` + ThreadID string `json:"thread_id,omitempty"` + JavaClass string `json:"java_class,omitempty"` + StartTime time.Time `json:"start_time,omitempty"` + EndTime time.Time `json:"end_time,omitempty"` + TransferLane string `json:"transfer_lane,omitempty"` + Dest string `json:"destination,omitempty"` + Src string `json:"source,omitempty"` + SenderID string `json:"sender_id,omitempty"` + Receiver string `json:"receiver,omitempty"` + Direction string `json:"direction,omitempty"` + Worker string `json:"worker,omitempty"` + Duration time.Duration `json:"duration,omitempty"` + DataRateMBs float64 `json:"datarate_mbs,omitempty"` + BytesProcessed int64 `json:"bytes_processed,omitempty"` + FileSizeMB float64 `json:"file_size_mb,omitempty"` } diff --git a/service_monitor.go b/service_monitor.go index 585da81..187885e 100644 --- a/service_monitor.go +++ b/service_monitor.go @@ -198,6 +198,9 @@ func (jep *JournalEntryParser) extractSystemdFields(journalData map[string]any, for _, field := range systemdFields { if value, ok := journalData[field]; ok { esFieldName := strings.ToLower(strings.TrimPrefix(field, "_")) + if entry.SyslogInfo.Fields == nil { + entry.SyslogInfo.Fields = make(map[string]any) + } entry.SyslogInfo.Fields[esFieldName] = value } } @@ -238,7 +241,7 @@ var ( func parseTixstreamService(entry LogEntry) LogEntry { newEntry := entry - var baseInfo TSBaseInfo + var baseInfo TSTransferInfo matches := tsServicePattern.FindStringSubmatch(newEntry.LogMessage) if len(matches) > 0 { @@ -285,13 +288,13 @@ func parseTixstreamService(entry LogEntry) LogEntry { baseInfo.Lane = threadInt baseInfo.Buffers = buffersInt baseInfo.FileCount = fileCountInt - baseInfo.FileSize = fileSizeFloat + baseInfo.FileSizeMB = fileSizeFloat baseInfo.ChunkSize = chunkSizeInt baseInfo.Streams = streamsInt baseInfo.TargetDatarate = datarateFloat baseInfo.Protocoll = tsDetail[8] - baseInfo.Destination = tsDetail[9] - baseInfo.Sender = tsDetail[10] + baseInfo.Dest = tsDetail[9] + baseInfo.SenderID = tsDetail[10] } tsDetail = tsDetailPattern2.FindStringSubmatch(newEntry.LogMessage) if len(tsDetail) > 0 { @@ -309,41 +312,43 @@ func parseTixstreamService(entry LogEntry) LogEntry { baseInfo.Lane = threadInt baseInfo.Buffers = buffersInt baseInfo.FileCount = fileCountInt - baseInfo.FileSize = fileSizeFloat + baseInfo.FileSizeMB = fileSizeFloat baseInfo.ChunkSize = chunkSizeInt baseInfo.Streams = streamsInt baseInfo.TargetDatarate = datarateFloat baseInfo.Protocoll = tsDetail[8] - baseInfo.Source = tsDetail[9] + baseInfo.Src = tsDetail[9] baseInfo.Receiver = tsDetail[10] } tsDetail = tsDetailPattern4.FindStringSubmatch(newEntry.LogMessage) if len(tsDetail) > 0 { threadInt, _ := strconv.Atoi(strings.Split(tsDetail[1], "/")[0]) baseInfo.Lane = threadInt - baseInfo.Source = tsDetail[2] - baseInfo.Destination = tsDetail[3] + baseInfo.Src = tsDetail[2] + baseInfo.Dest = tsDetail[3] } if strings.Contains(newEntry.LogMessage, "Transfer start") || strings.Contains(newEntry.LogMessage, "Transfer started,") { baseInfo.StartTime = newEntry.Timestamp + } else { + baseInfo.StartTime = time.Now() } if strings.Contains(newEntry.LogMessage, "Transfer stopped local state=finished") { baseInfo.EndTime = newEntry.Timestamp + } else { + baseInfo.EndTime = baseInfo.StartTime } if transferID != "" { baseInfo.TransferID = transferID } else { baseInfo.TransferID = "no_transfer_id" } - if !baseInfo.StartTime.IsZero() { - newEntry.BaseInformation = baseInfo - } + newEntry.ServiceInformation = baseInfo return newEntry } func parseTJMService(entry LogEntry) LogEntry { newEntry := entry - var baseInfo TJMBaseInfo + var baseInfo TJMTransferInfo logContent := entry.LogMessage msg := strings.TrimSpace(logContent) @@ -367,7 +372,7 @@ func parseTJMService(entry LogEntry) LogEntry { } newEntry.LogLevel = strings.TrimSpace(matches[2]) newEntry.LogMessage = strings.TrimSpace(matches[8]) - baseInfo = TJMBaseInfo{ + baseInfo = TJMTransferInfo{ ProcessID: strings.TrimSpace(matches[3]), CorrelationID: strings.TrimSpace(matches[4]), Username: strings.TrimSpace(matches[5]), @@ -405,7 +410,9 @@ func parseTJMService(entry LogEntry) LogEntry { } else { baseInfo.TransferID = "no_transfer_id" } - newEntry.BaseInformation = baseInfo + baseInfo.StartTime = newEntry.Timestamp + baseInfo.StartTime = newEntry.Timestamp + newEntry.ServiceInformation = baseInfo return newEntry } @@ -433,7 +440,7 @@ func parseAMService(entry LogEntry) LogEntry { } newEntry.LogLevel = matches[2] newEntry.LogMessage = matches[6] - newEntry.BaseInformation = baseInfo + newEntry.ServiceInformation = baseInfo return newEntry } @@ -461,7 +468,7 @@ func parseTCCService(entry LogEntry) LogEntry { } newEntry.LogLevel = matches[2] newEntry.LogMessage = matches[6] - newEntry.BaseInformation = baseInfo + newEntry.ServiceInformation = baseInfo return newEntry } @@ -501,6 +508,7 @@ func parseNginxService(entry LogEntry) LogEntry { baseInfo.RequestURI = requestParts[1] baseInfo.HTTPVersion = requestParts[2] } + newEntry.ServiceInformation = baseInfo return newEntry }