feat: implement drain3 based generic log-parser

This commit is contained in:
Patryk Hegenberg 2026-01-18 16:51:44 +01:00
parent 1d1568e3ee
commit 5af49f926a
17 changed files with 612 additions and 220 deletions

View file

@ -2,12 +2,12 @@ package main
import (
"bufio"
"codeberg.org/pata1704/drain3"
"context"
"encoding/json"
"fmt"
"log/slog"
"os/exec"
"regexp"
"strconv"
"strings"
"time"
@ -16,14 +16,18 @@ import (
)
type ServiceMonitor struct {
config ServiceConfig
hostname string
config ServiceConfig
hostname string
drainConfig *drain3.Config
stateDir string
}
func NewServiceMonitor(config ServiceConfig, hostname string) *ServiceMonitor {
func NewServiceMonitor(config ServiceConfig, hostname string, drainCfg *drain3.Config, stateDir string) *ServiceMonitor {
return &ServiceMonitor{
config: config,
hostname: hostname,
config: config,
hostname: hostname,
drainConfig: drainCfg,
stateDir: stateDir,
}
}
@ -52,7 +56,8 @@ func (sm *ServiceMonitor) Start(ctx context.Context, out chan<- models.LogMessag
}
}()
parser := NewJournalEntryParser(sm.config.Name, sm.config.Service, sm.hostname)
jParser := NewJournalEntryParser(sm.config.Name, sm.config.Service, sm.hostname, sm.drainConfig, sm.stateDir)
defer jParser.Close()
for scanner.Scan() {
select {
@ -66,7 +71,7 @@ func (sm *ServiceMonitor) Start(ctx context.Context, out chan<- models.LogMessag
continue
}
entry, err := parser.Parse(line)
entry, err := jParser.Parse(line)
if err != nil {
slog.Error("error parsing journal entry", "service", sm.config.Name, "error", err)
continue
@ -112,16 +117,38 @@ type JournalEntryParser struct {
serviceName string
unitName string
hostname string
innerParser parser.Parser
}
func NewJournalEntryParser(serviceName, unitName, hostname string) *JournalEntryParser {
func NewJournalEntryParser(serviceName, unitName, hostname string, drainCfg *drain3.Config, stateDir string) *JournalEntryParser {
pCfg := parser.ParserConfig{
ServiceName: serviceName,
LogType: "custom",
Hostname: hostname,
DrainConfig: drainCfg,
StateDir: stateDir,
}
inner, err := parser.New(pCfg)
if err != nil {
slog.Error("Failed to create inner parser for service", "service", serviceName, "error", err)
}
return &JournalEntryParser{
serviceName: serviceName,
unitName: unitName,
hostname: hostname,
innerParser: inner,
}
}
func (jep *JournalEntryParser) Close() error {
if jep.innerParser != nil {
return jep.innerParser.Close()
}
return nil
}
func (jep *JournalEntryParser) Parse(jsonLine string) (models.LogMessage, error) {
var journalData map[string]any
if err := json.Unmarshal([]byte(jsonLine), &journalData); err != nil {
@ -170,11 +197,25 @@ func (jep *JournalEntryParser) Parse(jsonLine string) (models.LogMessage, error)
entry.Raw = jsonLine
entry = jep.parseServiceSpecific(entry)
if jep.innerParser != nil && entry.LogMessage != "" {
parsedMsg, err := jep.innerParser.Parse(entry.LogMessage)
if err == nil {
jep.mergeEntries(&entry, &parsedMsg)
}
}
return entry, nil
}
func (jep *JournalEntryParser) mergeEntries(target *models.LogMessage, source *models.LogMessage) {
for k, v := range source.Fields {
target.Fields[k] = v
}
if source.LogLevel != "" {
target.LogLevel = source.LogLevel
}
}
func (jep *JournalEntryParser) getPriorityName(priority string) string {
priorityNames := map[string]string{
"0": "emergency",
@ -213,29 +254,3 @@ func (jep *JournalEntryParser) extractSystemdFields(journalData map[string]any,
}
}
}
func (jep *JournalEntryParser) parseServiceSpecific(entry models.LogMessage) models.LogMessage {
logParser, err := parser.New(jep.serviceName, "custom", jep.hostname)
if err != nil {
slog.Error("cannot get service specific parser")
return entry
}
entry, err = logParser.Parse(entry.LogMessage)
return entry
}
var (
amServicePattern = regexp.MustCompile(`^(\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}(?:\.\d+)?Z)\s+(\w+)\s+(\d+)\s+---\s+\[\s*([^\]]*)\]\s+([\w\.]+)\s*:\s*(.*)$`)
tccServicePattern = regexp.MustCompile(`^(\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}(?:\.\d+)?Z)\s+(\w+)\s+(\d+)\s+---\s+\[\s*([^\]]*)\]\s+([\w\.]+)\s*:\s*(.*)$`)
tjmServicePattern = regexp.MustCompile(`^(?<time>\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}\.\d{3})\s+(?<level>\S+)\s+(?<pid>\d+).*?\[(?<collatation_id>[^\]]*)\]\s+\[(?<username>[^\]]*)\]\s+\[(?<thread>[^\]]*)\]\s+(?<class>.*?)\s+:\s+(?<message>.*)`)
tjmTransferNamePattern = regexp.MustCompile(`^(\d{8}T\d{6}-[A-Za-z0-9]+-.+?-(?:in|out)) ?: (.*)$`)
tsServicePattern = regexp.MustCompile(`^(?<level>\S+)\s+(?<time>\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}\.\d{6})\s+(?<message>.*)`)
tsTransferIDPattern = regexp.MustCompile(`^(?<transfer>\w{8}-\w{4}-\w{4}-\w{4}-\w{12})\s+(?<message>.*)`)
tjmTransferIDPattern1 = regexp.MustCompile(`(?P<transfer>\w{8}-\w{4}-\w{4}-\w{4}-\w{12}).*?(?P<message>.*)`)
tjmTransferIDPattern2 = regexp.MustCompile(`(?P<before>.*)(?P<transfer>\w{8}-\w{4}-\w{4}-\w{4}-\w{12}).*?(?P<message>.*)`)
tsDetailPattern1 = regexp.MustCompile(`in: Transfer start (?P<thread>\d+/\d+) buffers=(?P<buffers>\d+) files=(?P<files>\d+) size=(?P<size>[0-9.]+) MByte chunksize=(?P<chunksize>\d+) streams=(?P<streams>\d+) target-datarate=(?P<target_datarate>[0-9.]+) MByte/s protocol=(?P<protocol>\w+) dest=(?P<dest>\S+) sender-id=(?P<sender_id>\S+)`)
tsDetailPattern2 = regexp.MustCompile(`out: Start remote transfer to (?P<target>[^\s]+) request executed, duration=(?P<duration>[0-9.]+) s`)
tsDetailPattern3 = regexp.MustCompile(`out: Transfer start (?P<thread>\d+/\d+) buffers=(?P<buffers>\d+) files=(?P<files>\d+) size=(?P<size>[0-9.]+) MByte chunksize=(?P<chunksize>\d+) streams=(?P<streams>\d+) target-datarate=(?P<target_datarate>[0-9.]+) MByte/s protocol=(?P<protocol>\w+) src=(?P<src>\S+) receiver=(?P<receiver>\S+)`)
tsDetailPattern4 = regexp.MustCompile(`out: Start transfer (?P<thread>\d+/\d+), src=(?P<src>[^ ]*) dest=(?P<dest>[^ ]*) item\[0\]=(?P<item0>[^ ]*) count=(?P<count>\d+)`)
nginxAccessPattern = regexp.MustCompile(`^(\S+)\s+\S+\s+(\S+)\s+\[([^\]]+)\]\s+"([^"]+)"\s+(\d+)\s+(\d+|-)\s*(?:"([^"]*)"\s+"([^"]*)")?`)
)