refactor: use parser package for systemd logs

This commit is contained in:
Patryk Hegenberg 2025-09-25 00:01:34 +02:00
parent 9aa1b7384d
commit e468b3a0e3
13 changed files with 897 additions and 275 deletions

View file

@ -12,6 +12,7 @@ import (
"strings"
"time"
"tixel_watch/models"
"tixel_watch/parser"
)
type ServiceMonitor struct {
@ -207,20 +208,13 @@ func (jep *JournalEntryParser) extractSystemdFields(journalData map[string]any,
}
func (jep *JournalEntryParser) parseServiceSpecific(entry models.LogMessage) models.LogMessage {
switch jep.serviceName {
case "tixstream":
return parseTixstreamService(entry)
case "transfer-job-manager":
return parseTJMService(entry)
case "nginx":
return parseNginxService(entry)
case "access-manager":
return parseAMService(entry)
case "tixel-control-center":
return parseTCCService(entry)
default:
logParser, err := parser.New(jep.serviceName, "custom")
if err != nil {
slog.Error("cannot get service specific parser")
return entry
}
entry, err = logParser.Parse(entry.LogMessage)
return entry
}
var (
@ -239,283 +233,283 @@ var (
nginxAccessPattern = regexp.MustCompile(`^(\S+)\s+\S+\s+(\S+)\s+\[([^\]]+)\]\s+"([^"]+)"\s+(\d+)\s+(\d+|-)\s*(?:"([^"]*)"\s+"([^"]*)")?`)
)
func parseTixstreamService(entry models.LogMessage) models.LogMessage {
newEntry := entry
var baseInfo models.TSTransferInfo
// func parseTixstreamService(entry models.LogMessage) models.LogMessage {
// newEntry := entry
// var baseInfo models.TSTransferInfo
matches := tsServicePattern.FindStringSubmatch(newEntry.LogMessage)
if len(matches) > 0 {
timestamp := strings.Join(strings.Split(matches[2], " "), "T")
newEntry.LogLevel = strings.TrimSpace(matches[1])
if newEntry.Timestamp.IsZero() {
timeParsed, err := parseRFC3339WithOptionalZ(timestamp)
if err != nil {
slog.Error("cant parse time string", "error", err)
}
newEntry.Timestamp = timeParsed
}
newEntry.LogMessage = strings.TrimSpace(matches[3])
}
trNameMatch := tsTransferIDPattern.FindStringSubmatch(newEntry.LogMessage)
var transferID string
if len(trNameMatch) > 0 {
transferID = trNameMatch[1]
newEntry.LogMessage = trNameMatch[2]
split := strings.Fields(trNameMatch[2])
switch split[0] {
case "in:":
baseInfo.Direction = "incoming"
case "out:":
baseInfo.Direction = "outgoing"
}
}
// matches := tsServicePattern.FindStringSubmatch(newEntry.LogMessage)
// if len(matches) > 0 {
// timestamp := strings.Join(strings.Split(matches[2], " "), "T")
// newEntry.LogLevel = strings.TrimSpace(matches[1])
// if newEntry.Timestamp.IsZero() {
// timeParsed, err := parseRFC3339WithOptionalZ(timestamp)
// if err != nil {
// slog.Error("cant parse time string", "error", err)
// }
// newEntry.Timestamp = timeParsed
// }
// newEntry.LogMessage = strings.TrimSpace(matches[3])
// }
// trNameMatch := tsTransferIDPattern.FindStringSubmatch(newEntry.LogMessage)
// var transferID string
// if len(trNameMatch) > 0 {
// transferID = trNameMatch[1]
// newEntry.LogMessage = trNameMatch[2]
// split := strings.Fields(trNameMatch[2])
// switch split[0] {
// case "in:":
// baseInfo.Direction = "incoming"
// case "out:":
// baseInfo.Direction = "outgoing"
// }
// }
msg := strings.ReplaceAll(newEntry.LogMessage, " ", " ")
parts := strings.Fields(msg)
// msg := strings.ReplaceAll(newEntry.LogMessage, " ", " ")
// parts := strings.Fields(msg)
if len(parts) < 5 {
return newEntry
}
tsDetail := tsDetailPattern1.FindStringSubmatch(newEntry.LogMessage)
if len(tsDetail) > 0 {
threadInt, _ := strconv.Atoi(strings.Split(tsDetail[1], "/")[0])
buffersInt, _ := strconv.Atoi(tsDetail[2])
fileCountInt, _ := strconv.Atoi(tsDetail[3])
chunkSizeInt, _ := strconv.Atoi(tsDetail[5])
streamsInt, _ := strconv.Atoi(tsDetail[6])
datarateFloat, _ := strconv.ParseFloat(tsDetail[7], 64)
fileSizeFloat, _ := strconv.ParseFloat(tsDetail[4], 64)
baseInfo.Lane = threadInt
baseInfo.Buffers = buffersInt
baseInfo.FileCount = fileCountInt
baseInfo.FileSizeMB = fileSizeFloat
baseInfo.ChunkSize = chunkSizeInt
baseInfo.Streams = streamsInt
baseInfo.TargetDatarate = datarateFloat
baseInfo.Protocoll = tsDetail[8]
baseInfo.Dest = tsDetail[9]
baseInfo.SenderID = tsDetail[10]
}
tsDetail = tsDetailPattern2.FindStringSubmatch(newEntry.LogMessage)
if len(tsDetail) > 0 {
baseInfo.Target = tsDetail[1]
}
tsDetail = tsDetailPattern3.FindStringSubmatch(newEntry.LogMessage)
if len(tsDetail) > 0 {
threadInt, _ := strconv.Atoi(strings.Split(tsDetail[1], "/")[0])
buffersInt, _ := strconv.Atoi(tsDetail[2])
fileCountInt, _ := strconv.Atoi(tsDetail[3])
fileSizeFloat, _ := strconv.ParseFloat(tsDetail[4], 64)
chunkSizeInt, _ := strconv.Atoi(tsDetail[5])
streamsInt, _ := strconv.Atoi(tsDetail[6])
datarateFloat, _ := strconv.ParseFloat(tsDetail[7], 64)
baseInfo.Lane = threadInt
baseInfo.Buffers = buffersInt
baseInfo.FileCount = fileCountInt
baseInfo.FileSizeMB = fileSizeFloat
baseInfo.ChunkSize = chunkSizeInt
baseInfo.Streams = streamsInt
baseInfo.TargetDatarate = datarateFloat
baseInfo.Protocoll = tsDetail[8]
baseInfo.Src = tsDetail[9]
baseInfo.Receiver = tsDetail[10]
}
tsDetail = tsDetailPattern4.FindStringSubmatch(newEntry.LogMessage)
if len(tsDetail) > 0 {
threadInt, _ := strconv.Atoi(strings.Split(tsDetail[1], "/")[0])
baseInfo.Lane = threadInt
baseInfo.Src = tsDetail[2]
baseInfo.Dest = tsDetail[3]
}
if strings.Contains(newEntry.LogMessage, "Transfer start") || strings.Contains(newEntry.LogMessage, "Transfer started,") {
baseInfo.StartTime = newEntry.Timestamp
} else {
baseInfo.StartTime = time.Now()
}
if strings.Contains(newEntry.LogMessage, "Transfer stopped local state=finished") {
baseInfo.EndTime = newEntry.Timestamp
} else {
baseInfo.EndTime = baseInfo.StartTime
}
if transferID != "" {
baseInfo.TransferID = transferID
} else {
baseInfo.TransferID = "no_transfer_id"
}
newEntry.ServiceInformation = baseInfo
return newEntry
}
// if len(parts) < 5 {
// return newEntry
// }
// tsDetail := tsDetailPattern1.FindStringSubmatch(newEntry.LogMessage)
// if len(tsDetail) > 0 {
// threadInt, _ := strconv.Atoi(strings.Split(tsDetail[1], "/")[0])
// buffersInt, _ := strconv.Atoi(tsDetail[2])
// fileCountInt, _ := strconv.Atoi(tsDetail[3])
// chunkSizeInt, _ := strconv.Atoi(tsDetail[5])
// streamsInt, _ := strconv.Atoi(tsDetail[6])
// datarateFloat, _ := strconv.ParseFloat(tsDetail[7], 64)
// fileSizeFloat, _ := strconv.ParseFloat(tsDetail[4], 64)
// baseInfo.Lane = threadInt
// baseInfo.Buffers = buffersInt
// baseInfo.FileCount = fileCountInt
// baseInfo.FileSizeMB = fileSizeFloat
// baseInfo.ChunkSize = chunkSizeInt
// baseInfo.Streams = streamsInt
// baseInfo.TargetDatarate = datarateFloat
// baseInfo.Protocoll = tsDetail[8]
// baseInfo.Dest = tsDetail[9]
// baseInfo.SenderID = tsDetail[10]
// }
// tsDetail = tsDetailPattern2.FindStringSubmatch(newEntry.LogMessage)
// if len(tsDetail) > 0 {
// baseInfo.Target = tsDetail[1]
// }
// tsDetail = tsDetailPattern3.FindStringSubmatch(newEntry.LogMessage)
// if len(tsDetail) > 0 {
// threadInt, _ := strconv.Atoi(strings.Split(tsDetail[1], "/")[0])
// buffersInt, _ := strconv.Atoi(tsDetail[2])
// fileCountInt, _ := strconv.Atoi(tsDetail[3])
// fileSizeFloat, _ := strconv.ParseFloat(tsDetail[4], 64)
// chunkSizeInt, _ := strconv.Atoi(tsDetail[5])
// streamsInt, _ := strconv.Atoi(tsDetail[6])
// datarateFloat, _ := strconv.ParseFloat(tsDetail[7], 64)
// baseInfo.Lane = threadInt
// baseInfo.Buffers = buffersInt
// baseInfo.FileCount = fileCountInt
// baseInfo.FileSizeMB = fileSizeFloat
// baseInfo.ChunkSize = chunkSizeInt
// baseInfo.Streams = streamsInt
// baseInfo.TargetDatarate = datarateFloat
// baseInfo.Protocoll = tsDetail[8]
// baseInfo.Src = tsDetail[9]
// baseInfo.Receiver = tsDetail[10]
// }
// tsDetail = tsDetailPattern4.FindStringSubmatch(newEntry.LogMessage)
// if len(tsDetail) > 0 {
// threadInt, _ := strconv.Atoi(strings.Split(tsDetail[1], "/")[0])
// baseInfo.Lane = threadInt
// baseInfo.Src = tsDetail[2]
// baseInfo.Dest = tsDetail[3]
// }
// if strings.Contains(newEntry.LogMessage, "Transfer start") || strings.Contains(newEntry.LogMessage, "Transfer started,") {
// baseInfo.StartTime = newEntry.Timestamp
// } else {
// baseInfo.StartTime = time.Now()
// }
// if strings.Contains(newEntry.LogMessage, "Transfer stopped local state=finished") {
// baseInfo.EndTime = newEntry.Timestamp
// } else {
// baseInfo.EndTime = baseInfo.StartTime
// }
// if transferID != "" {
// baseInfo.TransferID = transferID
// } else {
// baseInfo.TransferID = "no_transfer_id"
// }
// newEntry.ServiceInformation = baseInfo
// return newEntry
// }
func parseTJMService(entry models.LogMessage) models.LogMessage {
newEntry := entry
var baseInfo models.TJMTransferInfo
// func parseTJMService(entry models.LogMessage) models.LogMessage {
// newEntry := entry
// var baseInfo models.TJMTransferInfo
logContent := entry.LogMessage
msg := strings.TrimSpace(logContent)
msg = strings.ReplaceAll(msg, " ", " ")
msg = strings.ReplaceAll(msg, "---", "")
msg = strings.ReplaceAll(msg, " ", " ")
parts := strings.Fields(msg)
if len(parts) < 4 {
return newEntry
}
matches := tjmServicePattern.FindStringSubmatch(logContent)
if len(matches) > 0 {
timestamp := strings.Join(strings.Split(matches[2], " "), "T")
newEntry.LogLevel = strings.TrimSpace(matches[1])
if newEntry.Timestamp.IsZero() {
timeParsed, err := parseRFC3339WithOptionalZ(timestamp)
if err != nil {
slog.Error("cant parse time string", "error", err)
}
newEntry.Timestamp = timeParsed
}
newEntry.LogLevel = strings.TrimSpace(matches[2])
newEntry.LogMessage = strings.TrimSpace(matches[8])
baseInfo = models.TJMTransferInfo{
ProcessID: strings.TrimSpace(matches[3]),
CorrelationID: strings.TrimSpace(matches[4]),
Username: strings.TrimSpace(matches[5]),
ThreadID: strings.TrimSpace(matches[6]),
JavaClass: strings.TrimSpace(matches[7]),
}
} else {
newEntry.LogMessage = logContent
}
trNameMatch := tjmTransferNamePattern.FindStringSubmatch(newEntry.LogMessage)
var transferName string
var transferID string
if len(trNameMatch) > 0 {
transferName = trNameMatch[1]
newEntry.LogMessage = trNameMatch[2]
if strings.Contains(trNameMatch[1], "-in") {
baseInfo.Direction = "incoming"
}
if strings.Contains(trNameMatch[1], "-out") {
baseInfo.Direction = "outgoing"
}
}
trIDMatch := tjmTransferIDPattern1.FindStringSubmatch(newEntry.LogMessage)
if len(trIDMatch) > 0 {
transferID = trIDMatch[1]
}
trIDMatch = tjmTransferIDPattern2.FindStringSubmatch(newEntry.LogMessage)
if len(trIDMatch) > 0 {
transferID = trIDMatch[2]
}
if transferID != "" {
baseInfo.TransferID = transferID
} else if transferName != "" {
baseInfo.TransferID = transferName
} else {
baseInfo.TransferID = "no_transfer_id"
}
baseInfo.StartTime = newEntry.Timestamp
baseInfo.StartTime = newEntry.Timestamp
newEntry.ServiceInformation = baseInfo
// logContent := entry.LogMessage
// msg := strings.TrimSpace(logContent)
// msg = strings.ReplaceAll(msg, " ", " ")
// msg = strings.ReplaceAll(msg, "---", "")
// msg = strings.ReplaceAll(msg, " ", " ")
// parts := strings.Fields(msg)
// if len(parts) < 4 {
// return newEntry
// }
// matches := tjmServicePattern.FindStringSubmatch(logContent)
// if len(matches) > 0 {
// timestamp := strings.Join(strings.Split(matches[2], " "), "T")
// newEntry.LogLevel = strings.TrimSpace(matches[1])
// if newEntry.Timestamp.IsZero() {
// timeParsed, err := parseRFC3339WithOptionalZ(timestamp)
// if err != nil {
// slog.Error("cant parse time string", "error", err)
// }
// newEntry.Timestamp = timeParsed
// }
// newEntry.LogLevel = strings.TrimSpace(matches[2])
// newEntry.LogMessage = strings.TrimSpace(matches[8])
// baseInfo = models.TJMTransferInfo{
// ProcessID: strings.TrimSpace(matches[3]),
// CorrelationID: strings.TrimSpace(matches[4]),
// Username: strings.TrimSpace(matches[5]),
// ThreadID: strings.TrimSpace(matches[6]),
// JavaClass: strings.TrimSpace(matches[7]),
// }
// } else {
// newEntry.LogMessage = logContent
// }
// trNameMatch := tjmTransferNamePattern.FindStringSubmatch(newEntry.LogMessage)
// var transferName string
// var transferID string
// if len(trNameMatch) > 0 {
// transferName = trNameMatch[1]
// newEntry.LogMessage = trNameMatch[2]
// if strings.Contains(trNameMatch[1], "-in") {
// baseInfo.Direction = "incoming"
// }
// if strings.Contains(trNameMatch[1], "-out") {
// baseInfo.Direction = "outgoing"
// }
// }
// trIDMatch := tjmTransferIDPattern1.FindStringSubmatch(newEntry.LogMessage)
// if len(trIDMatch) > 0 {
// transferID = trIDMatch[1]
// }
// trIDMatch = tjmTransferIDPattern2.FindStringSubmatch(newEntry.LogMessage)
// if len(trIDMatch) > 0 {
// transferID = trIDMatch[2]
// }
// if transferID != "" {
// baseInfo.TransferID = transferID
// } else if transferName != "" {
// baseInfo.TransferID = transferName
// } else {
// baseInfo.TransferID = "no_transfer_id"
// }
// baseInfo.StartTime = newEntry.Timestamp
// baseInfo.StartTime = newEntry.Timestamp
// newEntry.ServiceInformation = baseInfo
return newEntry
}
// return newEntry
// }
func parseAMService(entry models.LogMessage) models.LogMessage {
newEntry := entry
logContent := newEntry.LogMessage
// func parseAMService(entry models.LogMessage) models.LogMessage {
// newEntry := entry
// logContent := newEntry.LogMessage
matches := amServicePattern.FindStringSubmatch(strings.TrimSpace(logContent))
if len(matches) != 7 {
return newEntry
}
timestampStr := strings.Join(strings.Split(matches[1], " "), "T")
if newEntry.Timestamp.IsZero() {
timeParsed, err := parseRFC3339WithOptionalZ(timestampStr)
if err != nil {
slog.Error("cant parse time string", "error", err)
}
newEntry.Timestamp = timeParsed
}
baseInfo := models.AMBaseInfo{
ProcessID: matches[3],
ThreadID: strings.TrimSpace(matches[4]),
LoggerName: matches[5],
}
newEntry.LogLevel = matches[2]
newEntry.LogMessage = matches[6]
newEntry.ServiceInformation = baseInfo
// matches := amServicePattern.FindStringSubmatch(strings.TrimSpace(logContent))
// if len(matches) != 7 {
// return newEntry
// }
// timestampStr := strings.Join(strings.Split(matches[1], " "), "T")
// if newEntry.Timestamp.IsZero() {
// timeParsed, err := parseRFC3339WithOptionalZ(timestampStr)
// if err != nil {
// slog.Error("cant parse time string", "error", err)
// }
// newEntry.Timestamp = timeParsed
// }
// baseInfo := models.AMBaseInfo{
// ProcessID: matches[3],
// ThreadID: strings.TrimSpace(matches[4]),
// LoggerName: matches[5],
// }
// newEntry.LogLevel = matches[2]
// newEntry.LogMessage = matches[6]
// newEntry.ServiceInformation = baseInfo
return newEntry
}
// return newEntry
// }
func parseTCCService(entry models.LogMessage) models.LogMessage {
newEntry := entry
logContent := newEntry.LogMessage
// func parseTCCService(entry models.LogMessage) models.LogMessage {
// newEntry := entry
// logContent := newEntry.LogMessage
matches := tccServicePattern.FindStringSubmatch(logContent)
if len(matches) != 7 {
return newEntry
}
timestampStr := strings.Join(strings.Split(matches[1], " "), "T")
if newEntry.Timestamp.IsZero() {
timeParsed, err := parseRFC3339WithOptionalZ(timestampStr)
if err != nil {
slog.Error("cant parse time string", "error", err)
}
newEntry.Timestamp = timeParsed
}
baseInfo := models.TCCBaseInfo{
ProcessID: matches[3],
ThreadID: strings.TrimSpace(matches[4]),
LoggerName: matches[5],
}
newEntry.LogLevel = matches[2]
newEntry.LogMessage = matches[6]
newEntry.ServiceInformation = baseInfo
// matches := tccServicePattern.FindStringSubmatch(logContent)
// if len(matches) != 7 {
// return newEntry
// }
// timestampStr := strings.Join(strings.Split(matches[1], " "), "T")
// if newEntry.Timestamp.IsZero() {
// timeParsed, err := parseRFC3339WithOptionalZ(timestampStr)
// if err != nil {
// slog.Error("cant parse time string", "error", err)
// }
// newEntry.Timestamp = timeParsed
// }
// baseInfo := models.TCCBaseInfo{
// ProcessID: matches[3],
// ThreadID: strings.TrimSpace(matches[4]),
// LoggerName: matches[5],
// }
// newEntry.LogLevel = matches[2]
// newEntry.LogMessage = matches[6]
// newEntry.ServiceInformation = baseInfo
return newEntry
}
// return newEntry
// }
func parseNginxService(entry models.LogMessage) models.LogMessage {
newEntry := entry
// func parseNginxService(entry models.LogMessage) models.LogMessage {
// newEntry := entry
matches := nginxAccessPattern.FindStringSubmatch(strings.TrimSpace(entry.LogMessage))
if len(matches) < 7 {
return newEntry
}
statusCode, err := strconv.ParseInt(matches[5], 10, 64)
if err != nil {
slog.Error("cant parse statuscode", "error", err)
}
bytesSend, err := strconv.ParseInt(matches[6], 10, 64)
if err != nil {
slog.Error("cant parse bytessend", "error", err)
}
baseInfo := models.NGinXBaseInfo{
ClientIP: matches[1],
RemoteUser: matches[2],
Request: matches[4],
StatusCode: int(statusCode),
BytesSend: int(bytesSend),
}
// matches := nginxAccessPattern.FindStringSubmatch(strings.TrimSpace(entry.LogMessage))
// if len(matches) < 7 {
// return newEntry
// }
// statusCode, err := strconv.ParseInt(matches[5], 10, 64)
// if err != nil {
// slog.Error("cant parse statuscode", "error", err)
// }
// bytesSend, err := strconv.ParseInt(matches[6], 10, 64)
// if err != nil {
// slog.Error("cant parse bytessend", "error", err)
// }
// baseInfo := models.NGinXBaseInfo{
// ClientIP: matches[1],
// RemoteUser: matches[2],
// Request: matches[4],
// StatusCode: int(statusCode),
// BytesSend: int(bytesSend),
// }
if len(matches) > 7 && matches[7] != "" {
baseInfo.Referer = matches[7]
}
if len(matches) > 8 && matches[8] != "" {
baseInfo.UserAgent = matches[8]
}
// if len(matches) > 7 && matches[7] != "" {
// baseInfo.Referer = matches[7]
// }
// if len(matches) > 8 && matches[8] != "" {
// baseInfo.UserAgent = matches[8]
// }
if requestParts := strings.Fields(matches[4]); len(requestParts) >= 3 {
baseInfo.HTTPMethod = requestParts[0]
baseInfo.RequestURI = requestParts[1]
baseInfo.HTTPVersion = requestParts[2]
}
newEntry.ServiceInformation = baseInfo
// if requestParts := strings.Fields(matches[4]); len(requestParts) >= 3 {
// baseInfo.HTTPMethod = requestParts[0]
// baseInfo.RequestURI = requestParts[1]
// baseInfo.HTTPVersion = requestParts[2]
// }
// newEntry.ServiceInformation = baseInfo
return newEntry
}
// return newEntry
// }
func parseRFC3339WithOptionalZ(timeStr string) (time.Time, error) {
if !strings.HasSuffix(timeStr, "Z") && !strings.ContainsAny(timeStr[len(timeStr)-6:], "+-") {
timeStr += "Z"
}
return time.Parse(time.RFC3339Nano, timeStr)
}
// func parseRFC3339WithOptionalZ(timeStr string) (time.Time, error) {
// if !strings.HasSuffix(timeStr, "Z") && !strings.ContainsAny(timeStr[len(timeStr)-6:], "+-") {
// timeStr += "Z"
// }
// return time.Parse(time.RFC3339Nano, timeStr)
// }