feat: delete old specific parsers and use new generic parser in file_monitor

This commit is contained in:
Patryk Hegenberg 2026-01-18 12:54:57 +01:00
parent 0830b403e0
commit 794180c6ab
9 changed files with 21 additions and 595 deletions

View file

@ -8,6 +8,7 @@ import (
"strings"
"watch-tool/models"
"watch-tool/parser"
"watch-tool/patterns"
"github.com/hpcloud/tail"
)
@ -22,22 +23,28 @@ func NewFileMonitor(config ToolConfig, hostname string) *FileMonitor {
var logParser parser.Parser
if config.Format.Pattern != "" {
pattern, err := regexp.Compile(config.Format.Pattern)
compiledRegex, err := regexp.Compile(config.Format.Pattern)
if err != nil {
slog.Error("invalid regex pattern", "tool", config.Name, "error", err)
logParser = &parser.DefaultParser{}
slog.Error("Invalid regex pattern in tool config", "tool", config.Name, "error", err)
logParser = parser.NewGenericParser(config.Name, hostname)
} else {
logParser = &parser.RegexLogParser{
Pattern: pattern,
Fields: config.Format.Fields,
Toolname: config.Name,
gp := parser.NewGenericParser(config.Name, hostname)
customExtractor := patterns.CompiledExtractor{
Name: "config_custom_pattern",
Pattern: compiledRegex,
Fields: config.Format.Fields,
}
gp.Extractors = append(gp.Extractors, customExtractor)
logParser = gp
}
} else {
var err error
logParser, err = parser.New(config.Name, "custom", hostname)
if err != nil {
slog.Error("cannot get tool specific parser", "error", err)
slog.Error("Cannot get tool specific parser from factory", "error", err)
logParser = parser.NewGenericParser(config.Name, hostname)
}
}
@ -74,7 +81,7 @@ func (fm *FileMonitor) Start(ctx context.Context, out chan<- models.LogMessage)
}
if line.Err != nil {
slog.Error("error reading log file", "tool", fm.config.Name, "error", line.Err)
slog.Error("Error reading log file", "tool", fm.config.Name, "error", line.Err)
continue
}
@ -84,7 +91,11 @@ func (fm *FileMonitor) Start(ctx context.Context, out chan<- models.LogMessage)
entry, err := fm.parser.Parse(line.Text)
if err != nil {
slog.Error("error parsing log line", "error", err)
slog.Error("Error parsing log line", "tool", fm.config.Name, "error", err)
} else {
if entry.Tool == "" {
entry.Tool = fm.config.Name
}
}
select {

View file

@ -1,49 +0,0 @@
package parser
import (
"log/slog"
"regexp"
"strings"
"time"
"watch-tool/helpers"
"watch-tool/models"
)
var (
amServicePattern = regexp.MustCompile(`^(\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}(?:\.\d+)?Z)\s+(\w+)\s+(\d+)\s+---\s+\[\s*([^\]]*)\]\s+([\w\.]+)\s*:\s*(.*)$`)
)
type AMParser struct{}
func (a *AMParser) Parse(line string) (models.LogMessage, error) {
newEntry := models.LogMessage{
Service: "access-manager",
}
syslogFields, logContent := helpers.ExtractSyslogHeader(line)
newEntry.Host = syslogFields.Hostname
matches := amServicePattern.FindStringSubmatch(strings.TrimSpace(logContent))
if len(matches) != 7 {
newEntry.Timestamp = time.Now()
newEntry.LogMessage = line
return newEntry, nil
}
timestampStr := strings.Join(strings.Split(matches[1], " "), "T")
timestamp, err := helpers.ParseRFC3339WithOptionalZ(timestampStr)
if err != nil {
slog.Error("unable to parse time", "error", err)
return newEntry, err
}
baseInfo := models.AMBaseInfo{
ProcessID: matches[3],
ThreadID: strings.TrimSpace(matches[4]),
LoggerName: matches[5],
}
newEntry.Timestamp = timestamp
newEntry.LogLevel = matches[2]
newEntry.LogMessage = matches[6]
newEntry.ServiceInformation = baseInfo
return newEntry, nil
}

View file

@ -1,29 +0,0 @@
package parser
import (
"strings"
"time"
"watch-tool/models"
)
type DefaultParser struct {
Service string
Tool string
Hostname string
}
func (d *DefaultParser) Parse(line string) (models.LogMessage, error) {
msg := models.LogMessage{
LogLevel: "unknown",
LogMessage: strings.TrimSpace(line),
Raw: line,
Timestamp: time.Now(),
}
if d.Service != "" {
msg.Service = d.Service
}
if d.Tool != "" {
msg.Tool = d.Tool
}
return msg, nil
}

View file

@ -1,117 +1,3 @@
// package parser
// import (
// "fmt"
// "strconv"
// "strings"
// "time"
// "watch-tool/models"
// "watch-tool/patterns"
// )
// type GenericParser struct {
// ServiceName string
// Hostname string
// Extractors []patterns.CompiledExtractor
// CommonExt []patterns.CompiledExtractor
// }
// func NewGenericParser(serviceName, hostname string) *GenericParser {
// repo := patterns.GetInstance()
// return &GenericParser{
// ServiceName: serviceName,
// Hostname: hostname,
// Extractors: repo.GetExtractors(serviceName),
// CommonExt: repo.GetExtractors("common"),
// }
// }
// func (p *GenericParser) Parse(line string) (models.LogMessage, error) {
// entry := models.LogMessage{
// Service: p.ServiceName,
// Host: p.Hostname,
// Timestamp: time.Now(),
// Raw: line,
// Fields: make(map[string]any),
// }
// // 1. Common Extractors laufen lassen (z.B. Syslog Header entfernen/parsen)
// // Wir nutzen eine temporäre Variable für den Rest-String, falls Header entfernt werden soll
// currentLine := line
// // Hinweis: Hier könnte man Syslog-Logik generisch einbauen.
// // Fürs Erste wenden wir Pattern einfach auf die Zeile an.
// // 2. Service Extractors anwenden
// // Wir probieren ALLE Extractors, um maximale Informationen zu gewinnen.
// // Das simuliert die Logik deiner alten Parser (erst Header, dann Details).
// allExtractors := append(p.CommonExt, p.Extractors...)
// for _, ext := range allExtractors {
// matches := ext.Pattern.FindStringSubmatch(currentLine)
// if matches == nil {
// continue
// }
// subexpNames := ext.Pattern.SubexpNames()
// for i, matchValue := range matches {
// if i == 0 || matchValue == "" {
// continue
// }
// groupName := subexpNames[i]
// if groupName == "" {
// continue
// }
// targetType := ext.Fields[groupName]
// parsedValue, err := convertType(matchValue, targetType)
// if err == nil {
// switch groupName {
// case "timestamp":
// if t, ok := parsedValue.(time.Time); ok {
// entry.Timestamp = t
// }
// case "log_level":
// entry.LogLevel = fmt.Sprintf("%v", parsedValue)
// case "message":
// entry.LogMessage = fmt.Sprintf("%v", parsedValue)
// default:
// entry.Fields[groupName] = parsedValue
// }
// }
// }
// }
// if entry.LogMessage == "" {
// entry.LogMessage = strings.TrimSpace(line)
// }
// return entry, nil
// }
// func convertType(value, typeDef string) (any, error) {
// if strings.HasPrefix(typeDef, "int") {
// return strconv.Atoi(value)
// }
// if strings.HasPrefix(typeDef, "float") {
// return strconv.ParseFloat(value, 64)
// }
// if after, ok := strings.CutPrefix(typeDef, "time:"); ok {
// layout := after
// // Workaround für Syslog (Jahr fehlt oft), hier vereinfacht:
// if layout == "Jan 02 15:04:05" {
// t, err := time.Parse(layout, value)
// if err == nil {
// return t.AddDate(time.Now().Year(), 0, 0), nil
// }
// return t, err
// }
// return time.Parse(layout, value)
// }
// // Default: String
// return value, nil
// }
package parser
import (
@ -287,8 +173,6 @@ func (p *GenericParser) mapField(entry *models.LogMessage, key string, value any
entry.PID = pid
}
}
// Mapping auf ServiceInformation Felder (Optional, falls nötig)
// case "transfer_id": ...
default:
entry.Fields[key] = value

View file

@ -1,57 +0,0 @@
package parser
import (
"log/slog"
"regexp"
"strconv"
"strings"
"watch-tool/models"
)
var (
nginxAccessPattern = regexp.MustCompile(`^(\S+)\s+\S+\s+(\S+)\s+\[([^\]]+)\]\s+"([^"]+)"\s+(\d+)\s+(\d+|-)\s*(?:"([^"]*)"\s+"([^"]*)")?`)
)
type NginxParser struct{}
func (n *NginxParser) Parse(line string) (models.LogMessage, error) {
newEntry := models.LogMessage{
Service: "nginx",
}
matches := nginxAccessPattern.FindStringSubmatch(strings.TrimSpace(line))
if len(matches) < 7 {
return newEntry, nil
}
statusCode, err := strconv.ParseInt(matches[5], 10, 64)
if err != nil {
slog.Error("cant parse statuscode", "error", err)
}
bytesSend, err := strconv.ParseInt(matches[6], 10, 64)
if err != nil {
slog.Error("cant parse bytessend", "error", err)
}
baseInfo := models.NGinXBaseInfo{
ClientIP: matches[1],
RemoteUser: matches[2],
Request: matches[4],
StatusCode: int(statusCode),
BytesSend: int(bytesSend),
}
if len(matches) > 7 && matches[7] != "" {
baseInfo.Referer = matches[7]
}
if len(matches) > 8 && matches[8] != "" {
baseInfo.UserAgent = matches[8]
}
if requestParts := strings.Fields(matches[4]); len(requestParts) >= 3 {
baseInfo.HTTPMethod = requestParts[0]
baseInfo.RequestURI = requestParts[1]
baseInfo.HTTPVersion = requestParts[2]
}
newEntry.ServiceInformation = baseInfo
return newEntry, nil
}

View file

@ -1,58 +0,0 @@
package parser
import (
"regexp"
"strings"
"watch-tool/models"
)
type RegexLogParser struct {
Pattern *regexp.Regexp
Fields map[string]string
Toolname string
Hostname string
}
func (p *RegexLogParser) Parse(line string) (models.LogMessage, error) {
entry := models.LogMessage{Type: "log_entry"}
entry.Tool = p.Toolname
entry.Raw = line
entry.Host = p.Hostname
fields := p.parseWithPattern(line)
if fields != nil {
entry.Fields = fields
} else {
entry.LogMessage = strings.TrimSpace(line)
}
return entry, nil
}
func (p *RegexLogParser) parseWithPattern(text string) map[string]any {
matches := p.Pattern.FindStringSubmatch(text)
if matches == nil {
return nil
}
fields := make(map[string]any)
subexpNames := p.Pattern.SubexpNames()
for i, match := range matches {
if i == 0 {
continue
}
if i < len(subexpNames) && subexpNames[i] != "" {
fieldName := subexpNames[i]
if mappedName, exists := p.Fields[fieldName]; exists {
fieldName = mappedName
}
fields[fieldName] = match
}
}
return fields
}

View file

@ -1,49 +0,0 @@
package parser
import (
"log/slog"
"regexp"
"strings"
"time"
"watch-tool/helpers"
"watch-tool/models"
)
var (
tccServicePattern = regexp.MustCompile(`^(\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}(?:\.\d+)?Z)\s+(\w+)\s+(\d+)\s+---\s+\[\s*([^\]]*)\]\s+([\w\.]+)\s*:\s*(.*)$`)
)
type TCCParser struct{}
func (t *TCCParser) Parse(line string) (models.LogMessage, error) {
newEntry := models.LogMessage{
Service: "tixel-control-center",
}
syslogFields, logContent := helpers.ExtractSyslogHeader(line)
newEntry.Host = syslogFields.Hostname
matches := tccServicePattern.FindStringSubmatch(strings.TrimSpace(logContent))
if len(matches) != 7 {
newEntry.Timestamp = time.Now()
newEntry.LogMessage = line
return newEntry, nil
}
timestampStr := strings.Join(strings.Split(matches[1], " "), "T")
timestamp, err := helpers.ParseRFC3339WithOptionalZ(timestampStr)
if err != nil {
slog.Error("unable to parse time", "error", err)
return newEntry, err
}
baseInfo := models.TCCBaseInfo{
ProcessID: matches[3],
ThreadID: strings.TrimSpace(matches[4]),
LoggerName: matches[5],
}
newEntry.Timestamp = timestamp
newEntry.LogLevel = matches[2]
newEntry.LogMessage = matches[6]
newEntry.ServiceInformation = baseInfo
return newEntry, nil
}

View file

@ -1,91 +0,0 @@
package parser
import (
"log/slog"
"regexp"
"strings"
"watch-tool/helpers"
"watch-tool/models"
)
var (
tjmServicePattern = regexp.MustCompile(`^(?<time>\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}\.\d{3})\s+(?<level>\S+)\s+(?<pid>\d+).*?\[(?<collatation_id>[^\]]*)\]\s+\[(?<username>[^\]]*)\]\s+\[(?<thread>[^\]]*)\]\s+(?<class>.*?)\s+:\s+(?<message>.*)`)
tjmTransferNamePattern = regexp.MustCompile(`^(\d{8}T\d{6}-[A-Za-z0-9]+-.+?-(?:in|out)) ?: (.*)$`)
tjmTransferIDPattern1 = regexp.MustCompile(`(?P<transfer>\w{8}-\w{4}-\w{4}-\w{4}-\w{12}).*?(?P<message>.*)`)
tjmTransferIDPattern2 = regexp.MustCompile(`(?P<before>.*)(?P<transfer>\w{8}-\w{4}-\w{4}-\w{4}-\w{12}).*?(?P<message>.*)`)
)
type TJMParser struct{}
func (t *TJMParser) Parse(line string) (models.LogMessage, error) {
newEntry := models.LogMessage{
Service: "transfer-job-manager",
}
syslogFields, logContent := helpers.ExtractSyslogHeader(line)
newEntry.Host = syslogFields.Hostname
msg := strings.TrimSpace(logContent)
msg = strings.ReplaceAll(msg, " ", " ")
msg = strings.ReplaceAll(msg, "---", "")
msg = strings.ReplaceAll(msg, " ", " ")
parts := strings.Fields(msg)
if len(parts) < 4 {
newEntry.LogMessage = logContent
return newEntry, nil
}
matches := tjmServicePattern.FindStringSubmatch(logContent)
var baseInfo models.TJMTransferInfo
if len(matches) > 0 {
timestampStr := strings.Join(strings.Split(matches[1], " "), "T")
timestamp, err := helpers.ParseRFC3339WithOptionalZ(timestampStr)
if err != nil {
slog.Error("unable to parse time", "error", err)
}
newEntry.Timestamp = timestamp
newEntry.LogLevel = strings.TrimSpace(matches[2])
newEntry.LogMessage = strings.TrimSpace(matches[8])
baseInfo = models.TJMTransferInfo{
ProcessID: strings.TrimSpace(matches[3]),
CorrelationID: strings.TrimSpace(matches[4]),
Username: strings.TrimSpace(matches[5]),
ThreadID: strings.TrimSpace(matches[6]),
JavaClass: strings.TrimSpace(matches[7]),
}
} else {
newEntry.LogMessage = logContent
}
trNameMatch := tjmTransferNamePattern.FindStringSubmatch(newEntry.LogMessage)
var transferName string
var transferID string
if len(trNameMatch) > 0 {
transferName = trNameMatch[1]
newEntry.LogMessage = trNameMatch[2]
if strings.Contains(trNameMatch[1], "-in") {
baseInfo.Direction = "incoming"
}
if strings.Contains(trNameMatch[1], "-out") {
baseInfo.Direction = "outgoing"
}
}
trIDMatch := tjmTransferIDPattern1.FindStringSubmatch(newEntry.LogMessage)
if len(trIDMatch) > 0 {
transferID = trIDMatch[1]
}
trIDMatch = tjmTransferIDPattern2.FindStringSubmatch(newEntry.LogMessage)
if len(trIDMatch) > 0 {
transferID = trIDMatch[2]
}
if transferID != "" {
baseInfo.TransferID = transferID
} else if transferName != "" {
baseInfo.TransferID = transferName
} else {
baseInfo.TransferID = "no_transfer_id"
}
if baseInfo.StartTime.IsZero() {
baseInfo.StartTime = newEntry.Timestamp
}
newEntry.ServiceInformation = baseInfo
return newEntry, nil
}

View file

@ -1,136 +0,0 @@
package parser
import (
"log/slog"
"regexp"
"strconv"
"strings"
"watch-tool/helpers"
"watch-tool/models"
)
var (
tsServicePattern = regexp.MustCompile(`^(?<level>\S+)\s+(?<time>\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}\.\d{6})\s+(?<message>.*)`)
tsTransferIDPattern = regexp.MustCompile(`^(?<transfer>\w{8}-\w{4}-\w{4}-\w{4}-\w{12})\s+(?<message>.*)`)
tsDetailPattern1 = regexp.MustCompile(`in: Transfer start (?P<thread>\d+/\d+) buffers=(?P<buffers>\d+) files=(?P<files>\d+) size=(?P<size>[0-9.]+) MByte chunksize=(?P<chunksize>\d+) streams=(?P<streams>\d+) target-datarate=(?P<target_datarate>[0-9.]+) MByte/s protocol=(?P<protocol>\w+) dest=(?P<dest>\S+) sender-id=(?P<sender_id>\S+)`)
tsDetailPattern2 = regexp.MustCompile(`out: Start remote transfer to (?P<target>[^\s]+) request executed, duration=(?P<duration>[0-9.]+) s`)
tsDetailPattern3 = regexp.MustCompile(`out: Transfer start (?P<thread>\d+/\d+) buffers=(?P<buffers>\d+) files=(?P<files>\d+) size=(?P<size>[0-9.]+) MByte chunksize=(?P<chunksize>\d+) streams=(?P<streams>\d+) target-datarate=(?P<target_datarate>[0-9.]+) MByte/s protocol=(?P<protocol>\w+) src=(?P<src>\S+) receiver=(?P<receiver>\S+)`)
tsDetailPattern4 = regexp.MustCompile(`out: Start transfer (?P<thread>\d+/\d+), src=(?P<src>[^ ]*) dest=(?P<dest>[^ ]*) item\[0\]=(?P<item0>[^ ]*) count=(?P<count>\d+)`)
)
type TSParser struct{}
func (p *TSParser) Parse(line string) (models.LogMessage, error) {
newEntry := models.LogMessage{
Service: "tixstream",
}
syslogFields, logContent := helpers.ExtractSyslogHeader(line)
newEntry.Host = syslogFields.Hostname
newEntry.Raw = line
newEntry.Type = "service_log"
matches := tsServicePattern.FindStringSubmatch(logContent)
if len(matches) > 0 {
timestampStr := strings.Join(strings.Split(matches[2], " "), "T")
timestamp, err := helpers.ParseRFC3339WithOptionalZ(timestampStr)
if err != nil {
slog.Error("unable to parse time", "error", err)
}
if timestamp.IsZero() {
timestamp = syslogFields.SysLogTimestamp
}
newEntry.LogLevel = strings.TrimSpace(matches[1])
newEntry.LogLevel = strings.ReplaceAll(newEntry.LogLevel, "ACE_Message_Block", "")
newEntry.Timestamp = timestamp
newEntry.LogMessage = strings.TrimSpace(matches[3])
} else {
newEntry.LogMessage = logContent
}
var baseInfo models.TSTransferInfo
trNameMatch := tsTransferIDPattern.FindStringSubmatch(newEntry.LogMessage)
var transferID string
if len(trNameMatch) > 0 {
transferID = trNameMatch[1]
newEntry.LogMessage = trNameMatch[2]
split := strings.Fields(trNameMatch[2])
switch split[0] {
case "in:":
baseInfo.Direction = "incoming"
case "out:":
baseInfo.Direction = "outgoing"
}
}
msg := strings.ReplaceAll(logContent, " ", " ")
parts := strings.Fields(msg)
if len(parts) < 5 {
return newEntry, nil
}
tsDetail := tsDetailPattern1.FindStringSubmatch(newEntry.LogMessage)
if len(tsDetail) > 0 {
threadInt, _ := strconv.Atoi(strings.Split(tsDetail[1], "/")[0])
baseInfo.Lane = threadInt
buffersInt, _ := strconv.Atoi(tsDetail[2])
fileCountInt, _ := strconv.Atoi(tsDetail[3])
fileSizeFloat, _ := strconv.ParseFloat(tsDetail[4], 64)
streamsInt, _ := strconv.Atoi(tsDetail[6])
datarateFloat, _ := strconv.ParseFloat(tsDetail[7], 64)
chunkSizeInt, _ := strconv.Atoi(tsDetail[5])
baseInfo.Buffers = buffersInt
baseInfo.FileCount = fileCountInt
baseInfo.FileSizeMB = fileSizeFloat
baseInfo.ChunkSize = chunkSizeInt
baseInfo.Streams = streamsInt
baseInfo.TargetDatarate = datarateFloat
baseInfo.Protocoll = tsDetail[8]
baseInfo.Dest = tsDetail[9]
baseInfo.SenderID = tsDetail[10]
}
tsDetail = tsDetailPattern2.FindStringSubmatch(newEntry.LogMessage)
if len(tsDetail) > 0 {
baseInfo.Target = tsDetail[1]
}
tsDetail = tsDetailPattern3.FindStringSubmatch(newEntry.LogMessage)
if len(tsDetail) > 0 {
threadInt, _ := strconv.Atoi(strings.Split(tsDetail[1], "/")[0])
baseInfo.Lane = threadInt
buffersInt, _ := strconv.Atoi(tsDetail[2])
baseInfo.Buffers = buffersInt
fileCountInt, _ := strconv.Atoi(tsDetail[3])
fileSizeFloat, _ := strconv.ParseFloat(tsDetail[4], 64)
chunkSizeInt, _ := strconv.Atoi(tsDetail[5])
streamsInt, _ := strconv.Atoi(tsDetail[6])
datarateFloat, _ := strconv.ParseFloat(tsDetail[7], 64)
baseInfo.FileCount = fileCountInt
baseInfo.FileSizeMB = fileSizeFloat
baseInfo.ChunkSize = chunkSizeInt
baseInfo.Streams = streamsInt
baseInfo.TargetDatarate = datarateFloat
baseInfo.Protocoll = tsDetail[8]
baseInfo.Src = tsDetail[9]
baseInfo.Receiver = tsDetail[10]
}
tsDetail = tsDetailPattern4.FindStringSubmatch(newEntry.LogMessage)
if len(tsDetail) > 0 {
threadInt, _ := strconv.Atoi(strings.Split(tsDetail[1], "/")[0])
baseInfo.Lane = threadInt
baseInfo.Src = tsDetail[2]
baseInfo.Dest = tsDetail[3]
}
if strings.Contains(newEntry.LogMessage, "Transfer start") || strings.Contains(newEntry.LogMessage, "Transfer started,") {
baseInfo.StartTime = newEntry.Timestamp
}
if strings.Contains(newEntry.LogMessage, "Transfer stopped local state=finished") {
baseInfo.EndTime = newEntry.Timestamp
}
if transferID != "" {
baseInfo.TransferID = transferID
} else {
baseInfo.TransferID = "no_transfer_id"
}
if !baseInfo.StartTime.IsZero() {
newEntry.ServiceInformation = baseInfo
}
return newEntry, nil
}