diff --git a/config.go b/config.go index e1b449e..6818b77 100644 --- a/config.go +++ b/config.go @@ -36,6 +36,7 @@ type ServiceConfig struct { } type ElasticsearchConfig struct { + Enabled bool `mapstructure:"enabled"` URL string `mapstructure:"url"` Index string `mapstructure:"index"` Username string `mapstructure:"username"` @@ -43,6 +44,11 @@ type ElasticsearchConfig struct { Timeout int `mapstructure:"timeout"` } +type LocalStorage struct { + Enable bool `mapstructure:"enabled"` + DBPath string `mapstructure:"db_path"` +} + type SystemMetrics struct { Enabled bool `mapstructure:"enabled"` CollectCPU bool `mapstructure:"collect_cpu"` @@ -66,6 +72,7 @@ type SystemMetrics struct { type Config struct { Elasticsearch ElasticsearchConfig `mapstructure:"elasticsearch"` + LocalStorage LocalStorage `mapstrucutre:"localstorage"` Tools []ToolConfig `mapstructure:"tools"` Services []ServiceConfig `mapstructure:"services"` PollIntervalSeconds int `mapstructure:"poll_interval_seconds"` diff --git a/elasticsearch.go b/elasticsearch.go index 1d1be2c..f2dfae6 100644 --- a/elasticsearch.go +++ b/elasticsearch.go @@ -102,12 +102,18 @@ func (esc *ElasticsearchClient) SendBatch(baseIndex string, entries []LogEntry) } func (esc *ElasticsearchClient) SendSystemMetrics(baseIndex string, metrics SystemResources) error { - data, err := json.Marshal(metrics) + msg := LogMessage{ + Service: "system-metrics", + Timestamp: time.Now(), + LogLevel: "Info", + SystemMetrics: metrics, + } + data, err := json.Marshal(msg) if err != nil { return fmt.Errorf("JSON marshalling error: %w", err) } - systemIndex := fmt.Sprintf("%s-system", baseIndex) + systemIndex := "tixel" ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) defer cancel() diff --git a/elasticsearch_exporter.go b/elasticsearch_exporter.go index b079724..0a36f5f 100644 --- a/elasticsearch_exporter.go +++ b/elasticsearch_exporter.go @@ -31,7 +31,7 @@ type ExportResult struct { Error string `json:"error,omitempty"` } -func (e *ElasticsearchExporter) ExportToStream(ctx context.Context, indices []string, batchSize int, writer io.Writer) error { +func (e *ElasticsearchExporter) ExportToStream(ctx context.Context, indices []string, batchSize int, since int, writer io.Writer) error { startTime := time.Now() if _, err := writer.Write([]byte("{\n \"export_info\": {\n")); err != nil { @@ -42,6 +42,7 @@ func (e *ElasticsearchExporter) ExportToStream(ctx context.Context, indices []st "timestamp": startTime, "indices": indices, "batch_size": batchSize, + "sinceDays": since, } infoBytes, err := json.MarshalIndent(exportInfo, " ", " ") @@ -72,7 +73,7 @@ func (e *ElasticsearchExporter) ExportToStream(ctx context.Context, indices []st } first = false - result := e.exportIndex(ctx, index, batchSize, writer) + result := e.exportIndex(ctx, index, batchSize, since, writer) results = append(results, result) if result.Error != "" { @@ -98,7 +99,7 @@ func (e *ElasticsearchExporter) ExportToStream(ctx context.Context, indices []st return nil } -func (e *ElasticsearchExporter) exportIndex(ctx context.Context, index string, batchSize int, writer io.Writer) ExportResult { +func (e *ElasticsearchExporter) exportIndex(ctx context.Context, index string, batchSize int, since int, writer io.Writer) ExportResult { startTime := time.Now() result := ExportResult{ Index: index, @@ -113,7 +114,18 @@ func (e *ElasticsearchExporter) exportIndex(ctx context.Context, index string, b } query := `{"query":{"match_all":{}}}` - + if since > 0 { + query = fmt.Sprintf(`{ + "query": { + "range": { + "timestamp": { + "gte": "now-%dd/d", + "lt": "now/d" + } + } + } +}`, since) + } res, err := e.client.Search( e.client.Search.WithContext(ctx), e.client.Search.WithIndex(index), diff --git a/file_monitor.go b/file_monitor.go index 83630c9..b84ab9c 100644 --- a/file_monitor.go +++ b/file_monitor.go @@ -5,6 +5,7 @@ import ( "fmt" "log/slog" "regexp" + "strconv" "strings" "github.com/hpcloud/tail" @@ -100,7 +101,7 @@ type DefaultLogParser struct{} func (p *DefaultLogParser) Parse(line string, toolName string) LogEntry { entry := NewLogEntry("log_entry") entry.Tool = toolName - entry.Message = strings.TrimSpace(line) + entry.LogMessage = strings.TrimSpace(line) entry.Raw = line return entry } @@ -119,7 +120,7 @@ func (p *RegexLogParser) Parse(line string, toolName string) LogEntry { if fields != nil { entry.Fields = fields } else { - entry.Message = strings.TrimSpace(line) + entry.LogMessage = strings.TrimSpace(line) } return entry @@ -159,34 +160,36 @@ func (p *NginxTJMLogParser) Parse(line string, toolName string) LogEntry { entry := NewLogEntry("log_entry") entry.Tool = toolName entry.Raw = line - entry.Fields = p.parseNginxTJM(line) + entry = p.parseNginxTJM(entry) return entry } -func (p *NginxTJMLogParser) parseNginxTJM(text string) map[string]any { - parts := strings.Fields(text) +func (p *NginxTJMLogParser) parseNginxTJM(entry LogEntry) LogEntry { + newEntry := entry + var nginxBase NGinXBaseInfo + parts := strings.Fields(entry.Raw) if len(parts) < 10 { - return map[string]any{ - "raw": text, - } + return newEntry } - fields := make(map[string]any) - if len(parts) > 0 { - timestamp := strings.Trim(parts[0], "[]") - fields["timestamp"] = timestamp + timestampStr := strings.Trim(parts[0], "[]") + timestamp, err := parseRFC3339WithOptionalZ(timestampStr) + if err != nil { + slog.Error("unable to parse time", "error", err) + } + newEntry.Timestamp = timestamp } if len(parts) > 2 { - fields["client_ip"] = parts[2] + nginxBase.ClientIP = parts[2] } for i, part := range parts { if strings.HasPrefix(part, "\"") { if i+1 < len(parts) { - fields["method"] = strings.Trim(part, "\"") - fields["route"] = parts[i+1] + nginxBase.HTTPMethod = strings.Trim(part, "\"") + nginxBase.Route = parts[i+1] } break } @@ -194,10 +197,15 @@ func (p *NginxTJMLogParser) parseNginxTJM(text string) map[string]any { for _, part := range parts { if after, ok := strings.CutPrefix(part, "status="); ok { - fields["status"] = after + statusCode, err := strconv.ParseInt(after, 10, 64) + if err != nil { + slog.Error("cant convert statuscode", "error", err) + } + nginxBase.StatusCode = int(statusCode) break } } + newEntry.BaseInformation = nginxBase - return fields + return newEntry } diff --git a/models.go b/models.go index 6138d3b..27677e3 100644 --- a/models.go +++ b/models.go @@ -113,19 +113,51 @@ type NetworkStat struct { } type LogEntry struct { - Timestamp time.Time `json:"@timestamp"` - Type string `json:"type"` - Host string `json:"host"` - Tool string `json:"tool,omitempty"` - Service string `json:"service,omitempty"` - Message string `json:"message,omitempty"` - Fields map[string]any `json:"fields,omitempty"` - Raw string `json:"raw,omitempty"` - Priority string `json:"priority,omitempty"` - Unit string `json:"unit,omitempty"` - PID int `json:"pid,omitempty"` - BootID string `json:"boot_id,omitempty"` - MachineID string `json:"machine_id,omitempty"` + Service string `json:"service,omitempty"` + Timestamp time.Time `json:"timestamp"` + Type string `json:"type"` + Host string `json:"host"` + Tool string `json:"tool,omitempty"` + LogLevel string `json:"log_level"` + LogMessage string `json:"message,omitempty"` + SyslogInfo SyslogFields `json:"syslog_information,omitempty"` + BaseInformation any `json:"base_info"` + ServiceInformation any `json:"service_info"` + SystemMetrics SystemResources `json:"system-metrics"` + ToolInformation any `json:"tool_info"` + Raw string `json:"raw,omitempty"` + Priority string `json:"priority,omitempty"` + PriorityName string `json:"priority_name,omitempty"` + Unit string `json:"unit,omitempty"` + PID int `json:"pid,omitempty"` + BootID string `json:"boot_id,omitempty"` + MachineID string `json:"machine_id,omitempty"` + Fields map[string]any `json:"fields,omitempty"` +} + +type LogMessage struct { + Service string `json:"service"` + Timestamp time.Time `json:"timestamp"` + LogLevel string `json:"log_level"` + LogMessage string `json:"log_message"` + SyslogInfo SyslogFields `json:"syslog_information"` + BaseInformation any `json:"base_info"` + ServiceInformation any `json:"service_info"` + SystemMetrics SystemResources `json:"system-metrics"` + ToolInformation any `json:"tool_info"` +} + +type SyslogFields struct { + SysLogTimestamp time.Time `json:"syslog_timestamp"` + Hostname string `json:"hostname"` + ProcessInfo string `json:"process_info"` + Raw string `json:"raw,omitempty"` + Priority string `json:"priority,omitempty"` + Unit string `json:"unit,omitempty"` + PID int `json:"pid,omitempty"` + BootID string `json:"boot_id,omitempty"` + MachineID string `json:"machine_id,omitempty"` + Fields map[string]any `json:"fields"` } func NewLogEntry(entryType string) LogEntry { @@ -133,7 +165,7 @@ func NewLogEntry(entryType string) LogEntry { Timestamp: time.Now(), Type: entryType, Host: hostname, - Fields: make(map[string]any), + // Fields: make(map[string]any), } } @@ -149,3 +181,97 @@ func NewSystemResources() SystemResources { BandwidthUtilization: make(map[string]BandwidthInfo), } } + +type AMBaseInfo struct { + ProcessID string `json:"process_id"` + ThreadID string `json:"thread_id"` + LoggerName string `json:"logger_name"` +} + +type TCCBaseInfo struct { + ProcessID string `json:"process_id"` + ThreadID string `json:"thread_id"` + LoggerName string `json:"logger_name"` +} + +type NGinXBaseInfo struct { + ClientIP string `json:"client_ip"` + RemoteUser string `json:"remote_user"` + Request string `json:"request"` + StatusCode int `json:"status_code"` + BytesSend int `json:"bytes_sent"` + Referer string `json:"referer"` + UserAgent string `json:"user_agent"` + HTTPMethod string `json:"http_method"` + RequestURI string `json:"request_uri"` + HTTPVersion string `json:"http_version"` + Route string `json:"route"` +} + +type TSBaseInfo struct { + TransferID string `json:"transfer_identifier"` + Lane int `json:"thread"` + Direction string `json:"direction"` + Buffers int `json:"buffers"` + FileCount int `json:"file_count"` + FileSize float64 `json:"file_size"` + ChunkSize int `json:"chunksize"` + Streams int `json:"streams"` + TargetDatarate float64 `json:"target_datarate"` + Protocoll string `json:"protocoll"` + Destination string `json:"destination"` + Sender string `json:"sender_id"` + Target string `json:"transfer_target"` + Source string `json:"source"` + Receiver string `json:"receiver_id"` + StartTime time.Time `json:"start_time"` + EndTime time.Time `json:"end_time"` +} + +type TSTransferInfo struct { + TransferID string `json:"transfer_identifier"` + StartTime time.Time `json:"start_time"` + EndTime time.Time `json:"end_time"` + TransferLane string `json:"transfer_lane"` + FileCount int `json:"file_count"` + FileSizeMB float64 `json:"file_size_mb"` + DataRateMBs float64 `json:"datarate_mbs"` + Dest string `json:"destination"` + Src string `json:"source"` + SenderID string `json:"sender_id"` + Receiver string `json:"receiver"` + BytesProcessed int64 `json:"bytes_processed"` + Direction string `json:"direction"` + Duration time.Duration `json:"duration_seconds"` + TheoreticalRate float64 `json:"theoretical_rate_mbs"` + Efficiency float64 `json:"efficiency_percent"` + Status string `json:"status"` +} + +type TJMBaseInfo struct { + ProcessID string `json:"process_id"` + TransferID string `json:"transfer_identifier"` + Direction string `json:"direction"` + Username string `json:"username"` + CorrelationID string `json:"correlation_id"` + ThreadID string `json:"thread_id"` + JavaClass string `json:"java_class"` +} + +type TJMTransferInfo struct { + TransferID string `json:"transfer_identifier"` + TransferName string `json:"transfer_name"` + StartTime time.Time `json:"start_time"` + EndTime time.Time `json:"end_time"` + TransferLane string `json:"transfer_lane"` + Dest string `json:"destination"` + Src string `json:"source"` + SenderID string `json:"sender_id"` + Receiver string `json:"receiver"` + Direction string `json:"direction"` + Worker string `json:"worker"` + Duration time.Duration `json:"duration"` + DataRateMBs float64 `json:"datarate_mbs"` + BytesProcessed int64 `json:"bytes_processed"` + FileSizeMB float64 `json:"file_size_mb"` +} diff --git a/service_monitor.go b/service_monitor.go index 120496a..585da81 100644 --- a/service_monitor.go +++ b/service_monitor.go @@ -136,17 +136,18 @@ func (jep *JournalEntryParser) Parse(jsonLine string) (LogEntry, error) { } if msg, ok := journalData["MESSAGE"].(string); ok { - entry.Message = msg + entry.LogMessage = msg } if priority, ok := journalData["PRIORITY"].(string); ok { entry.Priority = priority - entry.Fields["priority_name"] = jep.getPriorityName(priority) + entry.PriorityName = jep.getPriorityName(priority) } if pidStr, ok := journalData["_PID"].(string); ok { if pid, err := strconv.Atoi(pidStr); err == nil { entry.PID = pid + entry.SyslogInfo.ProcessInfo = strconv.FormatInt(int64(pid), 10) } } @@ -197,7 +198,7 @@ func (jep *JournalEntryParser) extractSystemdFields(journalData map[string]any, for _, field := range systemdFields { if value, ok := journalData[field]; ok { esFieldName := strings.ToLower(strings.TrimPrefix(field, "_")) - entry.Fields[esFieldName] = value + entry.SyslogInfo.Fields[esFieldName] = value } } } @@ -237,115 +238,114 @@ var ( func parseTixstreamService(entry LogEntry) LogEntry { newEntry := entry - fields := make(map[string]any) + var baseInfo TSBaseInfo - matches := tsServicePattern.FindStringSubmatch(newEntry.Message) + matches := tsServicePattern.FindStringSubmatch(newEntry.LogMessage) if len(matches) > 0 { timestamp := strings.Join(strings.Split(matches[2], " "), "T") - fields["log_level"] = strings.TrimSpace(matches[1]) - fields["message_timestamp"] = timestamp - fields["log_message"] = strings.TrimSpace(matches[3]) - } else { - fields["log_message"] = newEntry.Message + newEntry.LogLevel = strings.TrimSpace(matches[1]) + if newEntry.Timestamp.IsZero() { + timeParsed, err := parseRFC3339WithOptionalZ(timestamp) + if err != nil { + slog.Error("cant parse time string", "error", err) + } + newEntry.Timestamp = timeParsed + } + newEntry.LogMessage = strings.TrimSpace(matches[3]) } - trNameMatch := tsTransferIDPattern.FindStringSubmatch(fields["log_message"].(string)) + trNameMatch := tsTransferIDPattern.FindStringSubmatch(newEntry.LogMessage) var transferID string if len(trNameMatch) > 0 { transferID = trNameMatch[1] - fields["log_message"] = trNameMatch[2] + newEntry.LogMessage = trNameMatch[2] split := strings.Fields(trNameMatch[2]) switch split[0] { case "in:": - fields["transfer_direction"] = "incoming" + baseInfo.Direction = "incoming" case "out:": - fields["transfer_direction"] = "outgoing" + baseInfo.Direction = "outgoing" } } - msg := strings.ReplaceAll(newEntry.Message, " ", " ") + msg := strings.ReplaceAll(newEntry.LogMessage, " ", " ") parts := strings.Fields(msg) if len(parts) < 5 { return newEntry } - tsDetail := tsDetailPattern1.FindStringSubmatch(fields["log_message"].(string)) + tsDetail := tsDetailPattern1.FindStringSubmatch(newEntry.LogMessage) if len(tsDetail) > 0 { threadInt, _ := strconv.Atoi(strings.Split(tsDetail[1], "/")[0]) - fields["thread"] = threadInt buffersInt, _ := strconv.Atoi(tsDetail[2]) - fields["buffers"] = buffersInt fileCountInt, _ := strconv.Atoi(tsDetail[3]) - fields["file_count"] = fileCountInt - fileSizeInt, _ := strconv.Atoi(tsDetail[4]) - fields["file_size"] = fileSizeInt chunkSizeInt, _ := strconv.Atoi(tsDetail[5]) - fields["chunksize"] = chunkSizeInt streamsInt, _ := strconv.Atoi(tsDetail[6]) - fields["streams"] = streamsInt datarateFloat, _ := strconv.ParseFloat(tsDetail[7], 64) - fields["target_datarate"] = datarateFloat - fields["protocoll"] = tsDetail[8] - fields["destination"] = tsDetail[9] - fields["sender_id"] = tsDetail[10] + fileSizeFloat, _ := strconv.ParseFloat(tsDetail[4], 64) + baseInfo.Lane = threadInt + baseInfo.Buffers = buffersInt + baseInfo.FileCount = fileCountInt + baseInfo.FileSize = fileSizeFloat + baseInfo.ChunkSize = chunkSizeInt + baseInfo.Streams = streamsInt + baseInfo.TargetDatarate = datarateFloat + baseInfo.Protocoll = tsDetail[8] + baseInfo.Destination = tsDetail[9] + baseInfo.Sender = tsDetail[10] } - tsDetail = tsDetailPattern2.FindStringSubmatch(fields["log_message"].(string)) + tsDetail = tsDetailPattern2.FindStringSubmatch(newEntry.LogMessage) if len(tsDetail) > 0 { - fields["transfer_target"] = tsDetail[1] + baseInfo.Target = tsDetail[1] } - tsDetail = tsDetailPattern3.FindStringSubmatch(fields["log_message"].(string)) + tsDetail = tsDetailPattern3.FindStringSubmatch(newEntry.LogMessage) if len(tsDetail) > 0 { threadInt, _ := strconv.Atoi(strings.Split(tsDetail[1], "/")[0]) - fields["thread"] = threadInt buffersInt, _ := strconv.Atoi(tsDetail[2]) - fields["buffers"] = buffersInt fileCountInt, _ := strconv.Atoi(tsDetail[3]) - fields["file_count"] = fileCountInt - fileSizeInt, _ := strconv.Atoi(tsDetail[4]) - fields["file_size"] = fileSizeInt + fileSizeFloat, _ := strconv.ParseFloat(tsDetail[4], 64) chunkSizeInt, _ := strconv.Atoi(tsDetail[5]) - fields["chunksize"] = chunkSizeInt streamsInt, _ := strconv.Atoi(tsDetail[6]) - fields["streams"] = streamsInt datarateFloat, _ := strconv.ParseFloat(tsDetail[7], 64) - fields["target_datarate"] = datarateFloat - fields["protocoll"] = tsDetail[8] - fields["source"] = tsDetail[9] - fields["receiver_id"] = tsDetail[10] + baseInfo.Lane = threadInt + baseInfo.Buffers = buffersInt + baseInfo.FileCount = fileCountInt + baseInfo.FileSize = fileSizeFloat + baseInfo.ChunkSize = chunkSizeInt + baseInfo.Streams = streamsInt + baseInfo.TargetDatarate = datarateFloat + baseInfo.Protocoll = tsDetail[8] + baseInfo.Source = tsDetail[9] + baseInfo.Receiver = tsDetail[10] } - tsDetail = tsDetailPattern4.FindStringSubmatch(fields["log_message"].(string)) + tsDetail = tsDetailPattern4.FindStringSubmatch(newEntry.LogMessage) if len(tsDetail) > 0 { threadInt, _ := strconv.Atoi(strings.Split(tsDetail[1], "/")[0]) - fields["thread"] = threadInt - fields["source"] = tsDetail[2] - fields["destination"] = tsDetail[3] - fields["item"] = tsDetail[4] - fields["count"] = tsDetail[5] + baseInfo.Lane = threadInt + baseInfo.Source = tsDetail[2] + baseInfo.Destination = tsDetail[3] } - if strings.Contains(fields["log_message"].(string), "Transfer start") || strings.Contains(fields["log_message"].(string), "Transfer started,") { - fields["event"] = "transfer_started" - fields["start_time"] = fields["message_timestamp"] + if strings.Contains(newEntry.LogMessage, "Transfer start") || strings.Contains(newEntry.LogMessage, "Transfer started,") { + baseInfo.StartTime = newEntry.Timestamp } - if strings.Contains(fields["log_message"].(string), "Transfer stopped local state=finished") { - fields["event"] = "transfer_stopped" - fields["end_time"] = fields["message_timestamp"] + if strings.Contains(newEntry.LogMessage, "Transfer stopped local state=finished") { + baseInfo.EndTime = newEntry.Timestamp } - // value, ok := fields["transfer_id"] if transferID != "" { - fields["transfer_identifier"] = transferID + baseInfo.TransferID = transferID + } else { + baseInfo.TransferID = "no_transfer_id" } - if fields["transfer_identifier"] == nil { - fields["transfer_identifier"] = "unknown" + if !baseInfo.StartTime.IsZero() { + newEntry.BaseInformation = baseInfo } - if fields["message_timestamp"] == nil { - fields["message_timestamp"] = newEntry.Timestamp - } - newEntry.Fields = fields return newEntry } func parseTJMService(entry LogEntry) LogEntry { newEntry := entry - logContent := entry.Message + var baseInfo TJMBaseInfo + + logContent := entry.LogMessage msg := strings.TrimSpace(logContent) msg = strings.ReplaceAll(msg, " ", " ") msg = strings.ReplaceAll(msg, "---", "") @@ -354,156 +354,160 @@ func parseTJMService(entry LogEntry) LogEntry { if len(parts) < 4 { return newEntry } - fields := make(map[string]any) matches := tjmServicePattern.FindStringSubmatch(logContent) if len(matches) > 0 { - timestamp := strings.Join(strings.Split(matches[1], " "), "T") - fields["message_timestamp"] = timestamp - fields["log_level"] = strings.TrimSpace(matches[2]) - fields["pid"] = strings.TrimSpace(matches[3]) - fields["correlation_id"] = strings.TrimSpace(matches[4]) - fields["username"] = strings.TrimSpace(matches[5]) - fields["thread_id"] = strings.TrimSpace(matches[6]) - fields["java_class"] = strings.TrimSpace(matches[7]) - fields["log_message"] = strings.TrimSpace(matches[8]) + timestamp := strings.Join(strings.Split(matches[2], " "), "T") + newEntry.LogLevel = strings.TrimSpace(matches[1]) + if newEntry.Timestamp.IsZero() { + timeParsed, err := parseRFC3339WithOptionalZ(timestamp) + if err != nil { + slog.Error("cant parse time string", "error", err) + } + newEntry.Timestamp = timeParsed + } + newEntry.LogLevel = strings.TrimSpace(matches[2]) + newEntry.LogMessage = strings.TrimSpace(matches[8]) + baseInfo = TJMBaseInfo{ + ProcessID: strings.TrimSpace(matches[3]), + CorrelationID: strings.TrimSpace(matches[4]), + Username: strings.TrimSpace(matches[5]), + ThreadID: strings.TrimSpace(matches[6]), + JavaClass: strings.TrimSpace(matches[7]), + } } else { - fields["log_message"] = logContent + newEntry.LogMessage = logContent } - trNameMatch := tjmTransferNamePattern.FindStringSubmatch(fields["log_message"].(string)) + trNameMatch := tjmTransferNamePattern.FindStringSubmatch(newEntry.LogMessage) var transferName string var transferID string if len(trNameMatch) > 0 { transferName = trNameMatch[1] - fields["log_message"] = trNameMatch[2] + newEntry.LogMessage = trNameMatch[2] if strings.Contains(trNameMatch[1], "-in") { - fields["transfer_direction"] = "incoming" + baseInfo.Direction = "incoming" } if strings.Contains(trNameMatch[1], "-out") { - fields["transfer_direction"] = "outgoing" + baseInfo.Direction = "outgoing" } } - trIDMatch := tjmTransferIDPattern1.FindStringSubmatch(fields["log_message"].(string)) + trIDMatch := tjmTransferIDPattern1.FindStringSubmatch(newEntry.LogMessage) if len(trIDMatch) > 0 { transferID = trIDMatch[1] } - trIDMatch = tjmTransferIDPattern2.FindStringSubmatch(fields["log_message"].(string)) + trIDMatch = tjmTransferIDPattern2.FindStringSubmatch(newEntry.LogMessage) if len(trIDMatch) > 0 { transferID = trIDMatch[2] } - // value, ok := fields["transfer_id"] if transferID != "" { - fields["transfer_identifier"] = transferID + baseInfo.TransferID = transferID } else if transferName != "" { - // value, ok := fields["transfer_name"] - // if ok { - fields["transfer_identifier"] = transferName - // } + baseInfo.TransferID = transferName + } else { + baseInfo.TransferID = "no_transfer_id" } - if fields["transfer_identifier"] == nil { - fields["transfer_identifier"] = "unknown" - } - if fields["message_timestamp"] == nil { - fields["message_timestamp"] = newEntry.Timestamp - } - - newEntry.Fields = fields + newEntry.BaseInformation = baseInfo return newEntry } func parseAMService(entry LogEntry) LogEntry { newEntry := entry - if newEntry.Fields == nil { - newEntry.Fields = make(map[string]any) - } - fields := make(map[string]any) + logContent := newEntry.LogMessage - matches := amServicePattern.FindStringSubmatch(strings.TrimSpace(entry.Message)) + matches := amServicePattern.FindStringSubmatch(strings.TrimSpace(logContent)) if len(matches) != 7 { return newEntry } - timestamp := strings.Join(strings.Split(matches[1], " "), "T") - fields["message_timestamp"] = timestamp - fields["log_level"] = matches[2] - fields["process_id"] = matches[3] - fields["thread_id"] = strings.TrimSpace(matches[4]) - fields["logger_name"] = matches[5] - fields["log_message"] = matches[6] - - fields["transfer_identifier"] = "unknown" - if fields["message_timestamp"] == nil { - fields["message_timestamp"] = newEntry.Timestamp + timestampStr := strings.Join(strings.Split(matches[1], " "), "T") + if newEntry.Timestamp.IsZero() { + timeParsed, err := parseRFC3339WithOptionalZ(timestampStr) + if err != nil { + slog.Error("cant parse time string", "error", err) + } + newEntry.Timestamp = timeParsed } - newEntry.Fields = fields + baseInfo := AMBaseInfo{ + ProcessID: matches[3], + ThreadID: strings.TrimSpace(matches[4]), + LoggerName: matches[5], + } + newEntry.LogLevel = matches[2] + newEntry.LogMessage = matches[6] + newEntry.BaseInformation = baseInfo return newEntry } func parseTCCService(entry LogEntry) LogEntry { newEntry := entry - if newEntry.Fields == nil { - newEntry.Fields = make(map[string]any) - } + logContent := newEntry.LogMessage - fields := make(map[string]any) - matches := tccServicePattern.FindStringSubmatch(strings.TrimSpace(entry.Message)) + matches := tccServicePattern.FindStringSubmatch(logContent) if len(matches) != 7 { return newEntry } - timestamp := strings.Join(strings.Split(matches[1], " "), "T") - fields["message_timestamp"] = timestamp - fields["log_level"] = matches[2] - fields["process_id"] = matches[3] - fields["thread_id"] = strings.TrimSpace(matches[4]) - fields["logger_name"] = matches[5] - fields["log_message"] = matches[6] - - fields["transfer_identifier"] = "unknown" - if fields["message_timestamp"].(string) == "" { - fields["message_timestamp"] = newEntry.Timestamp + timestampStr := strings.Join(strings.Split(matches[1], " "), "T") + if newEntry.Timestamp.IsZero() { + timeParsed, err := parseRFC3339WithOptionalZ(timestampStr) + if err != nil { + slog.Error("cant parse time string", "error", err) + } + newEntry.Timestamp = timeParsed } - newEntry.Fields = fields - - newEntry.Fields["timestamp"] = matches[1] - newEntry.Fields["log_level"] = matches[2] - newEntry.Fields["process_id"] = matches[3] - newEntry.Fields["thread_name"] = strings.TrimSpace(matches[4]) - newEntry.Fields["logger_name"] = matches[5] - newEntry.Fields["log_message"] = matches[6] + baseInfo := TCCBaseInfo{ + ProcessID: matches[3], + ThreadID: strings.TrimSpace(matches[4]), + LoggerName: matches[5], + } + newEntry.LogLevel = matches[2] + newEntry.LogMessage = matches[6] + newEntry.BaseInformation = baseInfo return newEntry } func parseNginxService(entry LogEntry) LogEntry { newEntry := entry - if newEntry.Fields == nil { - newEntry.Fields = make(map[string]any) - } - matches := nginxAccessPattern.FindStringSubmatch(strings.TrimSpace(entry.Message)) + matches := nginxAccessPattern.FindStringSubmatch(strings.TrimSpace(entry.LogMessage)) if len(matches) < 7 { return newEntry } - - newEntry.Fields["client_ip"] = matches[1] - newEntry.Fields["remote_user"] = matches[2] - newEntry.Fields["timestamp"] = matches[3] - newEntry.Fields["request"] = matches[4] - newEntry.Fields["status_code"] = matches[5] - newEntry.Fields["bytes_sent"] = matches[6] + statusCode, err := strconv.ParseInt(matches[5], 10, 64) + if err != nil { + slog.Error("cant parse statuscode", "error", err) + } + bytesSend, err := strconv.ParseInt(matches[6], 10, 64) + if err != nil { + slog.Error("cant parse bytessend", "error", err) + } + baseInfo := NGinXBaseInfo{ + ClientIP: matches[1], + RemoteUser: matches[2], + Request: matches[4], + StatusCode: int(statusCode), + BytesSend: int(bytesSend), + } if len(matches) > 7 && matches[7] != "" { - newEntry.Fields["referer"] = matches[7] + baseInfo.Referer = matches[7] } if len(matches) > 8 && matches[8] != "" { - newEntry.Fields["user_agent"] = matches[8] + baseInfo.UserAgent = matches[8] } if requestParts := strings.Fields(matches[4]); len(requestParts) >= 3 { - newEntry.Fields["http_method"] = requestParts[0] - newEntry.Fields["request_uri"] = requestParts[1] - newEntry.Fields["http_version"] = requestParts[2] + baseInfo.HTTPMethod = requestParts[0] + baseInfo.RequestURI = requestParts[1] + baseInfo.HTTPVersion = requestParts[2] } return newEntry } + +func parseRFC3339WithOptionalZ(timeStr string) (time.Time, error) { + if !strings.HasSuffix(timeStr, "Z") && !strings.ContainsAny(timeStr[len(timeStr)-6:], "+-") { + timeStr += "Z" + } + return time.Parse(time.RFC3339Nano, timeStr) +} diff --git a/system_metrics.go b/system_metrics.go index b75e564..a9d0008 100644 --- a/system_metrics.go +++ b/system_metrics.go @@ -389,18 +389,6 @@ func (smc *SystemMetricsCollector) collectSystemLimits(result *SystemResources) return nil } -// // Hilfsfunktionen -// func NewSystemResources() SystemResources { -// return SystemResources{ -// Timestamp: time.Now(), -// DiskUsage: make(map[string]DiskUsage), -// DiskIOStats: make(map[string]DiskIOStat), -// NetworkStats: make(map[string]NetworkStat), -// NetworkLatency: make(map[string]LatencyInfo), -// BandwidthUtilization: make(map[string]BandwidthInfo), -// } -// } - func (smc *SystemMetricsCollector) collectProcessMetrics(result *SystemResources) error { processes, err := process.Processes() if err != nil { diff --git a/web_service.go b/web_service.go index 12d956a..8669489 100644 --- a/web_service.go +++ b/web_service.go @@ -77,14 +77,15 @@ func (ws *WebService) handleExport(w http.ResponseWriter, r *http.Request) { } size := ws.parseSizeParam(r) + since := ws.parseSinceParam(r) - slog.Info("Export request received", "indices", indices, "size", size) + slog.Info("Export request received", "indices", indices, "size", size, "since", since) w.Header().Set("Content-Type", "application/json") w.Header().Set("Content-Disposition", "attachment; filename=elasticsearch_export.json") exporter := NewElasticsearchExporter(ws.esClient) - if err := exporter.ExportToStream(r.Context(), indices, size, w); err != nil { + if err := exporter.ExportToStream(r.Context(), indices, size, since, w); err != nil { slog.Error("export error", "error", err) http.Error(w, fmt.Sprintf("Export error: %v", err), http.StatusInternalServerError) return @@ -204,6 +205,20 @@ func (ws *WebService) parseIndicesParam(r *http.Request) []string { return result } +func (ws *WebService) parseSinceParam(r *http.Request) int { + sinceParam := r.URL.Query().Get("since") + if sinceParam == "" { + return 0 + } + + since, err := strconv.Atoi(sinceParam) + if err != nil { + return 0 + } + + return since +} + func (ws *WebService) parseSizeParam(r *http.Request) int { sizeParam := r.URL.Query().Get("size") if sizeParam == "" {