From 9aa1b7384dfe0f23fbafb0a04e4f1218ba4a8335 Mon Sep 17 00:00:00 2001 From: Patryk Hegenberg Date: Wed, 24 Sep 2025 22:57:37 +0200 Subject: [PATCH] refactor: move models to their own package to use the same as in the importer --- elasticsearch.go | 11 ++-- elasticsearch_exporterV2.go | 3 +- export_manager.go | 3 +- file_monitor.go | 21 +++--- local_storage.go | 7 +- local_storageV2.go | 19 +++--- log_processor.go | 7 +- main.go | 3 +- models.go => models/models.go | 119 ++++++++++++++++++---------------- service_monitor.go | 43 ++++++------ storage_interface.go | 9 +-- system_metrics.go | 69 ++++++++++---------- system_metricsV2.go | 5 +- web_serviceV2.go | 3 +- 14 files changed, 170 insertions(+), 152 deletions(-) rename models.go => models/models.go (88%) diff --git a/elasticsearch.go b/elasticsearch.go index b8a7b11..3b3537d 100644 --- a/elasticsearch.go +++ b/elasticsearch.go @@ -7,6 +7,7 @@ import ( "log/slog" "strings" "time" + "tixel_watch/models" "github.com/elastic/go-elasticsearch/v8" ) @@ -50,8 +51,8 @@ func TestElasticsearchConnection(es *elasticsearch.Client) error { } type ElasticsearchSender interface { - SendBatch(baseIndex string, entries []LogEntry) error - SendSystemMetrics(baseIndex string, metrics SystemResources) error + SendBatch(baseIndex string, entries []models.LogMessage) error + SendSystemMetrics(baseIndex string, metrics models.SystemResources) error } type ElasticsearchClient struct { @@ -62,7 +63,7 @@ func NewElasticsearchSender(client *elasticsearch.Client) ElasticsearchSender { return &ElasticsearchClient{client: client} } -func (esc *ElasticsearchClient) SendBatch(baseIndex string, entries []LogEntry) error { +func (esc *ElasticsearchClient) SendBatch(baseIndex string, entries []models.LogMessage) error { if len(entries) == 0 { return nil } @@ -105,8 +106,8 @@ func (esc *ElasticsearchClient) SendBatch(baseIndex string, entries []LogEntry) return nil } -func (esc *ElasticsearchClient) SendSystemMetrics(baseIndex string, metrics SystemResources) error { - msg := LogEntry{ +func (esc *ElasticsearchClient) SendSystemMetrics(baseIndex string, metrics models.SystemResources) error { + msg := models.LogMessage{ Service: "system-metrics", Timestamp: time.Now(), LogLevel: "Info", diff --git a/elasticsearch_exporterV2.go b/elasticsearch_exporterV2.go index 537c1f7..b0205e5 100644 --- a/elasticsearch_exporterV2.go +++ b/elasticsearch_exporterV2.go @@ -7,6 +7,7 @@ import ( "log/slog" "strings" "time" + "tixel_watch/models" "github.com/elastic/go-elasticsearch/v8" ) @@ -28,7 +29,7 @@ func NewElasticsearchExporterV2(config ElasticsearchConfig) (*ElasticsearchExpor }, nil } -func (e *ElasticsearchExporterV2) Export(ctx context.Context, entries []LogEntry) error { +func (e *ElasticsearchExporterV2) Export(ctx context.Context, entries []models.LogMessage) error { if len(entries) == 0 { return nil } diff --git a/export_manager.go b/export_manager.go index 41d037f..bf0ee72 100644 --- a/export_manager.go +++ b/export_manager.go @@ -8,6 +8,7 @@ import ( "strings" "sync" "time" + "tixel_watch/models" ) type ExportManager struct { @@ -144,7 +145,7 @@ func (em *ExportManager) exportBatch(ctx context.Context) { } } -func (em *ExportManager) exportWithRetry(ctx context.Context, name string, exporter ExporterInterface, entries []LogEntry) error { +func (em *ExportManager) exportWithRetry(ctx context.Context, name string, exporter ExporterInterface, entries []models.LogMessage) error { var lastErr error for attempt := 0; attempt <= em.config.RetryAttempts; attempt++ { diff --git a/file_monitor.go b/file_monitor.go index 2f18a13..cec14fb 100644 --- a/file_monitor.go +++ b/file_monitor.go @@ -7,6 +7,7 @@ import ( "regexp" "strconv" "strings" + "tixel_watch/models" "github.com/hpcloud/tail" ) @@ -17,7 +18,7 @@ type FileMonitor struct { } type LogParser interface { - Parse(line string, toolName string) LogEntry + Parse(line string, toolName string) models.LogMessage } func NewFileMonitor(config ToolConfig) *FileMonitor { @@ -49,7 +50,7 @@ func NewFileMonitor(config ToolConfig) *FileMonitor { } } -func (fm *FileMonitor) Start(ctx context.Context, out chan<- LogEntry) error { +func (fm *FileMonitor) Start(ctx context.Context, out chan<- models.LogMessage) error { t, err := tail.TailFile(fm.config.LogFile, tail.Config{ Follow: true, ReOpen: true, @@ -98,8 +99,8 @@ func (fm *FileMonitor) Start(ctx context.Context, out chan<- LogEntry) error { type DefaultLogParser struct{} -func (p *DefaultLogParser) Parse(line string, toolName string) LogEntry { - entry := NewLogEntry("log_entry") +func (p *DefaultLogParser) Parse(line string, toolName string) models.LogMessage { + entry := models.NewLogMessage("log_entry", hostname) entry.Tool = toolName entry.LogMessage = strings.TrimSpace(line) entry.Raw = line @@ -111,8 +112,8 @@ type RegexLogParser struct { fields map[string]string } -func (p *RegexLogParser) Parse(line string, toolName string) LogEntry { - entry := NewLogEntry("log_entry") +func (p *RegexLogParser) Parse(line string, toolName string) models.LogMessage { + entry := models.NewLogMessage("log_entry", hostname) entry.Tool = toolName entry.Raw = line @@ -156,17 +157,17 @@ func (p *RegexLogParser) parseWithPattern(text string) map[string]any { type NginxTJMLogParser struct{} -func (p *NginxTJMLogParser) Parse(line string, toolName string) LogEntry { - entry := NewLogEntry("log_entry") +func (p *NginxTJMLogParser) Parse(line string, toolName string) models.LogMessage { + entry := models.NewLogMessage("log_entry", hostname) entry.Tool = toolName entry.Raw = line entry = p.parseNginxTJM(entry) return entry } -func (p *NginxTJMLogParser) parseNginxTJM(entry LogEntry) LogEntry { +func (p *NginxTJMLogParser) parseNginxTJM(entry models.LogMessage) models.LogMessage { newEntry := entry - var nginxBase NGinXBaseInfo + var nginxBase models.NGinXBaseInfo parts := strings.Fields(entry.Raw) if len(parts) < 10 { return newEntry diff --git a/local_storage.go b/local_storage.go index e3f84e6..f4b6265 100644 --- a/local_storage.go +++ b/local_storage.go @@ -4,6 +4,7 @@ import ( "context" "database/sql" "encoding/json" + "tixel_watch/models" _ "modernc.org/sqlite" ) @@ -53,7 +54,7 @@ func (s *StorageService) Close() error { return s.db.Close() } -func (s *StorageService) SaveLogEntry(ctx context.Context, entry *LogEntry) error { +func (s *StorageService) SaveLogEntry(ctx context.Context, entry *models.LogMessage) error { fieldsJSON := "" if entry.Fields != nil { b, err := json.Marshal(entry.Fields) @@ -118,10 +119,10 @@ func (s *StorageService) SaveLogEntry(ctx context.Context, entry *LogEntry) erro return err } -func (s *StorageService) LoadLogEntry(ctx context.Context, id int64) (*LogEntry, error) { +func (s *StorageService) LoadLogEntry(ctx context.Context, id int64) (*models.LogMessage, error) { row := s.db.QueryRowContext(ctx, "SELECT service, timestamp, type, host, tool, log_level, log_message, raw, priority, priority_name, unit, pid, boot_id, machine_id, fields, service_information, system_metrics, tool_information FROM log_entries WHERE id = ?", id) - var entry LogEntry + var entry models.LogMessage var fieldsJSON, serviceInfoJSON, systemMetricsJSON, toolInfoJSON string err := row.Scan( diff --git a/local_storageV2.go b/local_storageV2.go index 24856b8..bf2f2d3 100644 --- a/local_storageV2.go +++ b/local_storageV2.go @@ -6,6 +6,7 @@ import ( "encoding/json" "fmt" "strings" + "tixel_watch/models" _ "modernc.org/sqlite" ) @@ -80,11 +81,11 @@ func createTables(db *sql.DB) error { return nil } -func (s *SQLiteStorage) Store(ctx context.Context, entry *LogEntry) error { - return s.StoreBatch(ctx, []LogEntry{*entry}) +func (s *SQLiteStorage) Store(ctx context.Context, entry *models.LogMessage) error { + return s.StoreBatch(ctx, []models.LogMessage{*entry}) } -func (s *SQLiteStorage) StoreBatch(ctx context.Context, entries []LogEntry) error { +func (s *SQLiteStorage) StoreBatch(ctx context.Context, entries []models.LogMessage) error { if len(entries) == 0 { return nil } @@ -139,7 +140,7 @@ func (s *SQLiteStorage) StoreBatch(ctx context.Context, entries []LogEntry) erro return tx.Commit() } -func (s *SQLiteStorage) Query(ctx context.Context, query StorageQuery) ([]LogEntry, error) { +func (s *SQLiteStorage) Query(ctx context.Context, query StorageQuery) ([]models.LogMessage, error) { sqlQuery := "SELECT service, timestamp, type, host, tool, log_level, log_message, raw, priority, priority_name, unit, pid, boot_id, machine_id, fields, service_information, system_metrics, tool_information FROM log_entries WHERE 1=1" args := []any{} argCount := 0 @@ -198,9 +199,9 @@ func (s *SQLiteStorage) Query(ctx context.Context, query StorageQuery) ([]LogEnt } defer rows.Close() - var entries []LogEntry + var entries []models.LogMessage for rows.Next() { - var entry LogEntry + var entry models.LogMessage var fieldsJSON, serviceInfoJSON, systemMetricsJSON, toolInfoJSON string err := rows.Scan( @@ -280,7 +281,7 @@ func (s *SQLiteStorage) MarkAsExported(ctx context.Context, ids []int64) error { return err } -func (s *SQLiteStorage) GetUnexportedEntries(ctx context.Context, limit int) ([]LogEntry, error) { +func (s *SQLiteStorage) GetUnexportedEntries(ctx context.Context, limit int) ([]models.LogMessage, error) { sqlQuery := `SELECT id, service, timestamp, type, host, tool, log_level, log_message, raw, priority, priority_name, unit, pid, boot_id, machine_id, fields, service_information, system_metrics, tool_information FROM log_entries WHERE exported_at IS NULL ORDER BY timestamp ASC` @@ -295,9 +296,9 @@ func (s *SQLiteStorage) GetUnexportedEntries(ctx context.Context, limit int) ([] } defer rows.Close() - var entries []LogEntry + var entries []models.LogMessage for rows.Next() { - var entry LogEntry + var entry models.LogMessage var id int64 var fieldsJSON, serviceInfoJSON, systemMetricsJSON, toolInfoJSON string diff --git a/log_processor.go b/log_processor.go index 5921e11..ab0fd25 100644 --- a/log_processor.go +++ b/log_processor.go @@ -4,6 +4,7 @@ import ( "context" "log/slog" "time" + "tixel_watch/models" ) type LogProcessor struct { @@ -18,8 +19,8 @@ func NewLogProcessor(storage StorageInterface) *LogProcessor { } } -func (lp *LogProcessor) Start(ctx context.Context, logChan <-chan LogEntry) { - batch := make([]LogEntry, 0, lp.batchSize) +func (lp *LogProcessor) Start(ctx context.Context, logChan <-chan models.LogMessage) { + batch := make([]models.LogMessage, 0, lp.batchSize) ticker := time.NewTicker(5 * time.Second) defer ticker.Stop() @@ -58,7 +59,7 @@ func (lp *LogProcessor) Start(ctx context.Context, logChan <-chan LogEntry) { } } -func (lp *LogProcessor) storeBatch(ctx context.Context, batch []LogEntry) { +func (lp *LogProcessor) storeBatch(ctx context.Context, batch []models.LogMessage) { if len(batch) == 0 { return } diff --git a/main.go b/main.go index 0d829bd..79e5dfe 100644 --- a/main.go +++ b/main.go @@ -8,6 +8,7 @@ import ( "sync" "syscall" "time" + "tixel_watch/models" ) var hostname string @@ -76,7 +77,7 @@ func main() { // exportManager.RegisterExporter("grafana", grafanaExporter) } - logChan := make(chan LogEntry, 1000) + logChan := make(chan models.LogMessage, 1000) ctx, cancel := context.WithCancel(context.Background()) defer cancel() diff --git a/models.go b/models/models.go similarity index 88% rename from models.go rename to models/models.go index f5e81e0..5c9cfee 100644 --- a/models.go +++ b/models/models.go @@ -1,8 +1,13 @@ -package main +package models -import ( - "time" -) +import "time" + +type ESDocument struct { + Index string `json:"_index"` + Type string `json:"_type,omitempty"` + ID string `json:"_id,omitempty"` + Log LogMessage `json:"log"` +} type SystemResources struct { Timestamp time.Time `json:"@timestamp"` @@ -112,7 +117,7 @@ type NetworkStat struct { PacketsRecv uint64 `json:"packets_recv"` } -type LogEntry struct { +type LogMessage struct { Service string `json:"service,omitempty"` Timestamp time.Time `json:"timestamp"` Type string `json:"type"` @@ -120,7 +125,6 @@ type LogEntry struct { Tool string `json:"tool,omitempty"` LogLevel string `json:"log_level"` LogMessage string `json:"log_message,omitempty"` - SyslogInfo SyslogFields `json:"syslog_information,omitempty"` ServiceInformation any `json:"service_info,omitempty"` SystemMetrics any `json:"system-metrics,omitempty"` ToolInformation any `json:"tool_info,omitempty"` @@ -132,40 +136,22 @@ type LogEntry struct { BootID string `json:"boot_id,omitempty"` MachineID string `json:"machine_id,omitempty"` Fields map[string]any `json:"fields,omitempty"` + // SyslogInfo SyslogFields `json:"syslog_information,omitempty"` } +// type LogMessage struct { +// Service string `json:"service"` +// Timestamp time.Time `json:"timestamp"` +// LogLevel string `json:"log_level"` +// LogMessage string `json:"log_message"` +// SyslogInfo SyslogFields `json:"syslog_information"` +// ServiceInformation any `json:"service_info,omitempty"` +// } + type SyslogFields struct { - SysLogTimestamp time.Time `json:"syslog_timestamp,omitempty"` - Hostname string `json:"hostname,omitempty"` - ProcessInfo string `json:"process_info"` - Raw string `json:"raw,omitempty"` - Priority string `json:"priority,omitempty"` - Unit string `json:"unit,omitempty"` - PID int `json:"pid,omitempty"` - BootID string `json:"boot_id,omitempty"` - MachineID string `json:"machine_id,omitempty"` - Fields map[string]any `json:"fields"` -} - -func NewLogEntry(entryType string) LogEntry { - return LogEntry{ - Timestamp: time.Now(), - Type: entryType, - Host: hostname, - } -} - -func NewSystemResources() SystemResources { - return SystemResources{ - Timestamp: time.Now(), - Type: "system_metrics", - Host: hostname, - DiskUsage: make(map[string]DiskUsage), - DiskIOStats: make(map[string]DiskIOStat), - NetworkStats: make(map[string]NetworkStat), - NetworkLatency: make(map[string]LatencyInfo), - BandwidthUtilization: make(map[string]BandwidthInfo), - } + SysLogTimestamp time.Time `json:"syslog_timestamp"` + Hostname string `json:"hostname"` + ProcessInfo string `json:"process_info"` } type AMBaseInfo struct { @@ -180,25 +166,11 @@ type TCCBaseInfo struct { LoggerName string `json:"logger_name"` } -type NGinXBaseInfo struct { - ClientIP string `json:"client_ip"` - RemoteUser string `json:"remote_user"` - Request string `json:"request"` - StatusCode int `json:"status_code"` - BytesSend int `json:"bytes_sent"` - Referer string `json:"referer"` - UserAgent string `json:"user_agent"` - HTTPMethod string `json:"http_method"` - RequestURI string `json:"request_uri"` - HTTPVersion string `json:"http_version"` - Route string `json:"route"` -} - type TSTransferInfo struct { TransferID string `json:"transfer_identifier,omitempty"` Lane int `json:"lane,omitempty"` - StartTime time.Time `json:"start_time,omitempty"` - EndTime time.Time `json:"end_time,omitempty"` + StartTime time.Time `json:"start_time"` + EndTime time.Time `json:"end_time"` Buffers int `json:"buffers,omitempty"` Streams int `json:"streams,omitempty"` ChunkSize int `json:"chunksize,omitempty"` @@ -219,6 +191,7 @@ type TSTransferInfo struct { TheoreticalRate float64 `json:"theoretical_rate_mbs,omitempty"` Efficiency float64 `json:"efficiency_percent,omitempty"` Status string `json:"status,omitempty"` + Hostname string `json:"hostname"` } type TJMTransferInfo struct { @@ -229,8 +202,8 @@ type TJMTransferInfo struct { CorrelationID string `json:"correlation_id,omitempty"` ThreadID string `json:"thread_id,omitempty"` JavaClass string `json:"java_class,omitempty"` - StartTime time.Time `json:"start_time,omitempty"` - EndTime time.Time `json:"end_time,omitempty"` + StartTime time.Time `json:"start_time"` + EndTime time.Time `json:"end_time"` TransferLane string `json:"transfer_lane,omitempty"` Dest string `json:"destination,omitempty"` Src string `json:"source,omitempty"` @@ -242,4 +215,40 @@ type TJMTransferInfo struct { DataRateMBs float64 `json:"datarate_mbs,omitempty"` BytesProcessed int64 `json:"bytes_processed,omitempty"` FileSizeMB float64 `json:"file_size_mb,omitempty"` + Hostname string `json:"hostname"` +} + +type NGinXBaseInfo struct { + ClientIP string `json:"client_ip"` + RemoteUser string `json:"remote_user"` + Request string `json:"request"` + StatusCode int `json:"status_code"` + BytesSend int `json:"bytes_sent"` + Referer string `json:"referer"` + UserAgent string `json:"user_agent"` + HTTPMethod string `json:"http_method"` + RequestURI string `json:"request_uri"` + HTTPVersion string `json:"http_version"` + Route string `json:"route"` +} + +func NewLogMessage(entryType, hostname string) LogMessage { + return LogMessage{ + Timestamp: time.Now(), + Type: entryType, + Host: hostname, + } +} + +func NewSystemResources(hostname string) SystemResources { + return SystemResources{ + Timestamp: time.Now(), + Type: "system_metrics", + Host: hostname, + DiskUsage: make(map[string]DiskUsage), + DiskIOStats: make(map[string]DiskIOStat), + NetworkStats: make(map[string]NetworkStat), + NetworkLatency: make(map[string]LatencyInfo), + BandwidthUtilization: make(map[string]BandwidthInfo), + } } diff --git a/service_monitor.go b/service_monitor.go index e91a4a8..c5a3085 100644 --- a/service_monitor.go +++ b/service_monitor.go @@ -11,6 +11,7 @@ import ( "strconv" "strings" "time" + "tixel_watch/models" ) type ServiceMonitor struct { @@ -23,7 +24,7 @@ func NewServiceMonitor(config ServiceConfig) *ServiceMonitor { } } -func (sm *ServiceMonitor) Start(ctx context.Context, out chan<- LogEntry) error { +func (sm *ServiceMonitor) Start(ctx context.Context, out chan<- models.LogMessage) error { args := sm.buildJournalctlArgs() slog.Info("starting journalctl", "arguments", args) @@ -116,13 +117,13 @@ func NewJournalEntryParser(serviceName, unitName string) *JournalEntryParser { } } -func (jep *JournalEntryParser) Parse(jsonLine string) (LogEntry, error) { +func (jep *JournalEntryParser) Parse(jsonLine string) (models.LogMessage, error) { var journalData map[string]any if err := json.Unmarshal([]byte(jsonLine), &journalData); err != nil { - return LogEntry{}, fmt.Errorf("JSON unmarshal error: %w", err) + return models.LogMessage{}, fmt.Errorf("JSON unmarshal error: %w", err) } - entry := NewLogEntry("service_log") + entry := models.NewLogMessage("service_log", hostname) entry.Service = jep.serviceName entry.Unit = jep.unitName @@ -147,7 +148,6 @@ func (jep *JournalEntryParser) Parse(jsonLine string) (LogEntry, error) { if pidStr, ok := journalData["_PID"].(string); ok { if pid, err := strconv.Atoi(pidStr); err == nil { entry.PID = pid - entry.SyslogInfo.ProcessInfo = strconv.FormatInt(int64(pid), 10) } } @@ -188,7 +188,7 @@ func (jep *JournalEntryParser) getPriorityName(priority string) string { return "unknown" } -func (jep *JournalEntryParser) extractSystemdFields(journalData map[string]any, entry *LogEntry) { +func (jep *JournalEntryParser) extractSystemdFields(journalData map[string]any, entry *models.LogMessage) { systemdFields := []string{ "_SYSTEMD_UNIT", "_SYSTEMD_USER_UNIT", "_SYSTEMD_SLICE", "_BOOT_ID", "_MACHINE_ID", "_HOSTNAME", "_TRANSPORT", @@ -201,15 +201,12 @@ func (jep *JournalEntryParser) extractSystemdFields(journalData map[string]any, for _, field := range systemdFields { if value, ok := journalData[field]; ok { esFieldName := strings.ToLower(strings.TrimPrefix(field, "_")) - if entry.SyslogInfo.Fields == nil { - entry.SyslogInfo.Fields = make(map[string]any) - } - entry.SyslogInfo.Fields[esFieldName] = value + entry.Fields[esFieldName] = value } } } -func (jep *JournalEntryParser) parseServiceSpecific(entry LogEntry) LogEntry { +func (jep *JournalEntryParser) parseServiceSpecific(entry models.LogMessage) models.LogMessage { switch jep.serviceName { case "tixstream": return parseTixstreamService(entry) @@ -230,7 +227,7 @@ var ( amServicePattern = regexp.MustCompile(`^(\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}(?:\.\d+)?Z)\s+(\w+)\s+(\d+)\s+---\s+\[\s*([^\]]*)\]\s+([\w\.]+)\s*:\s*(.*)$`) tccServicePattern = regexp.MustCompile(`^(\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}(?:\.\d+)?Z)\s+(\w+)\s+(\d+)\s+---\s+\[\s*([^\]]*)\]\s+([\w\.]+)\s*:\s*(.*)$`) tjmServicePattern = regexp.MustCompile(`^(?