watch-tool/service_monitor.go

513 lines
15 KiB
Go

package main
import (
"bufio"
"context"
"encoding/json"
"fmt"
"log/slog"
"os/exec"
"regexp"
"strconv"
"strings"
"time"
)
type ServiceMonitor struct {
config ServiceConfig
}
func NewServiceMonitor(config ServiceConfig) *ServiceMonitor {
return &ServiceMonitor{
config: config,
}
}
func (sm *ServiceMonitor) Start(ctx context.Context, out chan<- LogEntry) error {
args := sm.buildJournalctlArgs()
slog.Info("starting journalctl", "arguments", args)
cmd := exec.CommandContext(ctx, args[0], args[1:]...)
stdout, err := cmd.StdoutPipe()
if err != nil {
return fmt.Errorf("error StdoutPipe: %w", err)
}
if err := cmd.Start(); err != nil {
return fmt.Errorf("error start command: %w", err)
}
scanner := bufio.NewScanner(stdout)
go func() {
<-ctx.Done()
if cmd.Process != nil {
cmd.Process.Kill()
}
}()
parser := NewJournalEntryParser(sm.config.Name, sm.config.Service)
for scanner.Scan() {
select {
case <-ctx.Done():
return nil
default:
}
line := scanner.Text()
if strings.TrimSpace(line) == "" {
continue
}
entry, err := parser.Parse(line)
if err != nil {
slog.Error("error parsing journal entry", "service", sm.config.Name, "error", err)
continue
}
select {
case out <- entry:
case <-ctx.Done():
return nil
default:
slog.Warn("Service-Log-Channel is full, entry dropped", "service", sm.config.Name)
}
}
if err := scanner.Err(); err != nil {
return fmt.Errorf("scanner error: %w", err)
}
return cmd.Wait()
}
func (sm *ServiceMonitor) buildJournalctlArgs() []string {
args := []string{
"sudo",
"journalctl",
"-f",
"-o", "json",
"-u", sm.config.Service,
}
if sm.config.SinceTime != "" {
args = append(args, "--since", sm.config.SinceTime)
}
if sm.config.Priority != "" {
args = append(args, "-p", sm.config.Priority)
}
return args
}
type JournalEntryParser struct {
serviceName string
unitName string
}
func NewJournalEntryParser(serviceName, unitName string) *JournalEntryParser {
return &JournalEntryParser{
serviceName: serviceName,
unitName: unitName,
}
}
func (jep *JournalEntryParser) Parse(jsonLine string) (LogEntry, error) {
var journalData map[string]any
if err := json.Unmarshal([]byte(jsonLine), &journalData); err != nil {
return LogEntry{}, fmt.Errorf("JSON unmarshal error: %w", err)
}
entry := NewLogEntry("service_log")
entry.Service = jep.serviceName
entry.Unit = jep.unitName
if tsStr, ok := journalData["__REALTIME_TIMESTAMP"].(string); ok {
if tsInt, err := strconv.ParseInt(tsStr, 10, 64); err == nil {
entry.Timestamp = time.Unix(0, tsInt*1000)
}
}
if entry.Timestamp.IsZero() {
entry.Timestamp = time.Now()
}
if msg, ok := journalData["MESSAGE"].(string); ok {
entry.LogMessage = msg
}
if priority, ok := journalData["PRIORITY"].(string); ok {
entry.Priority = priority
entry.PriorityName = jep.getPriorityName(priority)
}
if pidStr, ok := journalData["_PID"].(string); ok {
if pid, err := strconv.Atoi(pidStr); err == nil {
entry.PID = pid
entry.SyslogInfo.ProcessInfo = strconv.FormatInt(int64(pid), 10)
}
}
jep.extractSystemdFields(journalData, &entry)
if bootID, ok := journalData["_BOOT_ID"].(string); ok {
entry.BootID = bootID
}
if machineID, ok := journalData["_MACHINE_ID"].(string); ok {
entry.MachineID = machineID
}
entry.Raw = jsonLine
entry = jep.parseServiceSpecific(entry)
return entry, nil
}
func (jep *JournalEntryParser) getPriorityName(priority string) string {
priorityNames := map[string]string{
"0": "emergency",
"1": "alert",
"2": "critical",
"3": "error",
"4": "warning",
"5": "notice",
"6": "info",
"7": "debug",
}
if name, exists := priorityNames[priority]; exists {
return name
}
return "unknown"
}
func (jep *JournalEntryParser) extractSystemdFields(journalData map[string]any, entry *LogEntry) {
systemdFields := []string{
"_SYSTEMD_UNIT", "_SYSTEMD_USER_UNIT", "_SYSTEMD_SLICE",
"_BOOT_ID", "_MACHINE_ID", "_HOSTNAME", "_TRANSPORT",
"_CAP_EFFECTIVE", "_SELINUX_CONTEXT", "_AUDIT_SESSION",
"_AUDIT_LOGINUID", "_GID", "_UID", "_COMM", "_EXE",
"_CMDLINE", "_SYSTEMD_CGROUP", "_SYSTEMD_SESSION",
"_SYSTEMD_OWNER_UID", "_SOURCE_REALTIME_TIMESTAMP",
}
for _, field := range systemdFields {
if value, ok := journalData[field]; ok {
esFieldName := strings.ToLower(strings.TrimPrefix(field, "_"))
entry.SyslogInfo.Fields[esFieldName] = value
}
}
}
func (jep *JournalEntryParser) parseServiceSpecific(entry LogEntry) LogEntry {
switch jep.serviceName {
case "tixstream":
return parseTixstreamService(entry)
case "transfer-job-manager":
return parseTJMService(entry)
case "nginx":
return parseNginxService(entry)
case "access-manager":
return parseAMService(entry)
case "tixel-control-center":
return parseTCCService(entry)
default:
return entry
}
}
var (
amServicePattern = regexp.MustCompile(`^(\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}(?:\.\d+)?Z)\s+(\w+)\s+(\d+)\s+---\s+\[\s*([^\]]*)\]\s+([\w\.]+)\s*:\s*(.*)$`)
tccServicePattern = regexp.MustCompile(`^(\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}(?:\.\d+)?Z)\s+(\w+)\s+(\d+)\s+---\s+\[\s*([^\]]*)\]\s+([\w\.]+)\s*:\s*(.*)$`)
tjmServicePattern = regexp.MustCompile(`^(?<time>\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}\.\d{3})\s+(?<level>\S+)\s+(?<pid>\d+).*?\[(?<collatation_id>[^\]]*)\]\s+\[(?<username>[^\]]*)\]\s+\[(?<thread>[^\]]*)\]\s+(?<class>.*?)\s+:\s+(?<message>.*)`)
tjmTransferNamePattern = regexp.MustCompile(`^(\d{8}T\d{6}-[A-Za-z0-9]+-[A-Za-z]+-(?:in|out)) ?: (.*)$`)
tsServicePattern = regexp.MustCompile(`^(?<level>\S+)\s+(?<time>\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}\.\d{6})\s+(?<message>.*)`)
tsTransferIDPattern = regexp.MustCompile(`^(?<transfer>\w{8}-\w{4}-\w{4}-\w{4}-\w{12})\s+(?<message>.*)`)
tjmTransferIDPattern1 = regexp.MustCompile(`(?P<transfer>\w{8}-\w{4}-\w{4}-\w{4}-\w{12}).*?(?P<message>.*)`)
tjmTransferIDPattern2 = regexp.MustCompile(`(?P<before>.*)(?P<transfer>\w{8}-\w{4}-\w{4}-\w{4}-\w{12}).*?(?P<message>.*)`)
tsDetailPattern1 = regexp.MustCompile(`in: Transfer start (?P<thread>\d+/\d+) buffers=(?P<buffers>\d+) files=(?P<files>\d+) size=(?P<size>[0-9.]+) MByte chunksize=(?P<chunksize>\d+) streams=(?P<streams>\d+) target-datarate=(?P<target_datarate>[0-9.]+) MByte/s protocol=(?P<protocol>\w+) dest=(?P<dest>\S+) sender-id=(?P<sender_id>\S+)`)
tsDetailPattern2 = regexp.MustCompile(`out: Start remote transfer to (?P<target>[^\s]+) request executed, duration=(?P<duration>[0-9.]+) s`)
tsDetailPattern3 = regexp.MustCompile(`out: Transfer start (?P<thread>\d+/\d+) buffers=(?P<buffers>\d+) files=(?P<files>\d+) size=(?P<size>[0-9.]+) MByte chunksize=(?P<chunksize>\d+) streams=(?P<streams>\d+) target-datarate=(?P<target_datarate>[0-9.]+) MByte/s protocol=(?P<protocol>\w+) src=(?P<src>\S+) receiver=(?P<receiver>\S+)`)
tsDetailPattern4 = regexp.MustCompile(`out: Start transfer (?P<thread>\d+/\d+), src=(?P<src>[^ ]*) dest=(?P<dest>[^ ]*) item\[0\]=(?P<item0>[^ ]*) count=(?P<count>\d+)`)
nginxAccessPattern = regexp.MustCompile(`^(\S+)\s+\S+\s+(\S+)\s+\[([^\]]+)\]\s+"([^"]+)"\s+(\d+)\s+(\d+|-)\s*(?:"([^"]*)"\s+"([^"]*)")?`)
)
func parseTixstreamService(entry LogEntry) LogEntry {
newEntry := entry
var baseInfo TSBaseInfo
matches := tsServicePattern.FindStringSubmatch(newEntry.LogMessage)
if len(matches) > 0 {
timestamp := strings.Join(strings.Split(matches[2], " "), "T")
newEntry.LogLevel = strings.TrimSpace(matches[1])
if newEntry.Timestamp.IsZero() {
timeParsed, err := parseRFC3339WithOptionalZ(timestamp)
if err != nil {
slog.Error("cant parse time string", "error", err)
}
newEntry.Timestamp = timeParsed
}
newEntry.LogMessage = strings.TrimSpace(matches[3])
}
trNameMatch := tsTransferIDPattern.FindStringSubmatch(newEntry.LogMessage)
var transferID string
if len(trNameMatch) > 0 {
transferID = trNameMatch[1]
newEntry.LogMessage = trNameMatch[2]
split := strings.Fields(trNameMatch[2])
switch split[0] {
case "in:":
baseInfo.Direction = "incoming"
case "out:":
baseInfo.Direction = "outgoing"
}
}
msg := strings.ReplaceAll(newEntry.LogMessage, " ", " ")
parts := strings.Fields(msg)
if len(parts) < 5 {
return newEntry
}
tsDetail := tsDetailPattern1.FindStringSubmatch(newEntry.LogMessage)
if len(tsDetail) > 0 {
threadInt, _ := strconv.Atoi(strings.Split(tsDetail[1], "/")[0])
buffersInt, _ := strconv.Atoi(tsDetail[2])
fileCountInt, _ := strconv.Atoi(tsDetail[3])
chunkSizeInt, _ := strconv.Atoi(tsDetail[5])
streamsInt, _ := strconv.Atoi(tsDetail[6])
datarateFloat, _ := strconv.ParseFloat(tsDetail[7], 64)
fileSizeFloat, _ := strconv.ParseFloat(tsDetail[4], 64)
baseInfo.Lane = threadInt
baseInfo.Buffers = buffersInt
baseInfo.FileCount = fileCountInt
baseInfo.FileSize = fileSizeFloat
baseInfo.ChunkSize = chunkSizeInt
baseInfo.Streams = streamsInt
baseInfo.TargetDatarate = datarateFloat
baseInfo.Protocoll = tsDetail[8]
baseInfo.Destination = tsDetail[9]
baseInfo.Sender = tsDetail[10]
}
tsDetail = tsDetailPattern2.FindStringSubmatch(newEntry.LogMessage)
if len(tsDetail) > 0 {
baseInfo.Target = tsDetail[1]
}
tsDetail = tsDetailPattern3.FindStringSubmatch(newEntry.LogMessage)
if len(tsDetail) > 0 {
threadInt, _ := strconv.Atoi(strings.Split(tsDetail[1], "/")[0])
buffersInt, _ := strconv.Atoi(tsDetail[2])
fileCountInt, _ := strconv.Atoi(tsDetail[3])
fileSizeFloat, _ := strconv.ParseFloat(tsDetail[4], 64)
chunkSizeInt, _ := strconv.Atoi(tsDetail[5])
streamsInt, _ := strconv.Atoi(tsDetail[6])
datarateFloat, _ := strconv.ParseFloat(tsDetail[7], 64)
baseInfo.Lane = threadInt
baseInfo.Buffers = buffersInt
baseInfo.FileCount = fileCountInt
baseInfo.FileSize = fileSizeFloat
baseInfo.ChunkSize = chunkSizeInt
baseInfo.Streams = streamsInt
baseInfo.TargetDatarate = datarateFloat
baseInfo.Protocoll = tsDetail[8]
baseInfo.Source = tsDetail[9]
baseInfo.Receiver = tsDetail[10]
}
tsDetail = tsDetailPattern4.FindStringSubmatch(newEntry.LogMessage)
if len(tsDetail) > 0 {
threadInt, _ := strconv.Atoi(strings.Split(tsDetail[1], "/")[0])
baseInfo.Lane = threadInt
baseInfo.Source = tsDetail[2]
baseInfo.Destination = tsDetail[3]
}
if strings.Contains(newEntry.LogMessage, "Transfer start") || strings.Contains(newEntry.LogMessage, "Transfer started,") {
baseInfo.StartTime = newEntry.Timestamp
}
if strings.Contains(newEntry.LogMessage, "Transfer stopped local state=finished") {
baseInfo.EndTime = newEntry.Timestamp
}
if transferID != "" {
baseInfo.TransferID = transferID
} else {
baseInfo.TransferID = "no_transfer_id"
}
if !baseInfo.StartTime.IsZero() {
newEntry.BaseInformation = baseInfo
}
return newEntry
}
func parseTJMService(entry LogEntry) LogEntry {
newEntry := entry
var baseInfo TJMBaseInfo
logContent := entry.LogMessage
msg := strings.TrimSpace(logContent)
msg = strings.ReplaceAll(msg, " ", " ")
msg = strings.ReplaceAll(msg, "---", "")
msg = strings.ReplaceAll(msg, " ", " ")
parts := strings.Fields(msg)
if len(parts) < 4 {
return newEntry
}
matches := tjmServicePattern.FindStringSubmatch(logContent)
if len(matches) > 0 {
timestamp := strings.Join(strings.Split(matches[2], " "), "T")
newEntry.LogLevel = strings.TrimSpace(matches[1])
if newEntry.Timestamp.IsZero() {
timeParsed, err := parseRFC3339WithOptionalZ(timestamp)
if err != nil {
slog.Error("cant parse time string", "error", err)
}
newEntry.Timestamp = timeParsed
}
newEntry.LogLevel = strings.TrimSpace(matches[2])
newEntry.LogMessage = strings.TrimSpace(matches[8])
baseInfo = TJMBaseInfo{
ProcessID: strings.TrimSpace(matches[3]),
CorrelationID: strings.TrimSpace(matches[4]),
Username: strings.TrimSpace(matches[5]),
ThreadID: strings.TrimSpace(matches[6]),
JavaClass: strings.TrimSpace(matches[7]),
}
} else {
newEntry.LogMessage = logContent
}
trNameMatch := tjmTransferNamePattern.FindStringSubmatch(newEntry.LogMessage)
var transferName string
var transferID string
if len(trNameMatch) > 0 {
transferName = trNameMatch[1]
newEntry.LogMessage = trNameMatch[2]
if strings.Contains(trNameMatch[1], "-in") {
baseInfo.Direction = "incoming"
}
if strings.Contains(trNameMatch[1], "-out") {
baseInfo.Direction = "outgoing"
}
}
trIDMatch := tjmTransferIDPattern1.FindStringSubmatch(newEntry.LogMessage)
if len(trIDMatch) > 0 {
transferID = trIDMatch[1]
}
trIDMatch = tjmTransferIDPattern2.FindStringSubmatch(newEntry.LogMessage)
if len(trIDMatch) > 0 {
transferID = trIDMatch[2]
}
if transferID != "" {
baseInfo.TransferID = transferID
} else if transferName != "" {
baseInfo.TransferID = transferName
} else {
baseInfo.TransferID = "no_transfer_id"
}
newEntry.BaseInformation = baseInfo
return newEntry
}
func parseAMService(entry LogEntry) LogEntry {
newEntry := entry
logContent := newEntry.LogMessage
matches := amServicePattern.FindStringSubmatch(strings.TrimSpace(logContent))
if len(matches) != 7 {
return newEntry
}
timestampStr := strings.Join(strings.Split(matches[1], " "), "T")
if newEntry.Timestamp.IsZero() {
timeParsed, err := parseRFC3339WithOptionalZ(timestampStr)
if err != nil {
slog.Error("cant parse time string", "error", err)
}
newEntry.Timestamp = timeParsed
}
baseInfo := AMBaseInfo{
ProcessID: matches[3],
ThreadID: strings.TrimSpace(matches[4]),
LoggerName: matches[5],
}
newEntry.LogLevel = matches[2]
newEntry.LogMessage = matches[6]
newEntry.BaseInformation = baseInfo
return newEntry
}
func parseTCCService(entry LogEntry) LogEntry {
newEntry := entry
logContent := newEntry.LogMessage
matches := tccServicePattern.FindStringSubmatch(logContent)
if len(matches) != 7 {
return newEntry
}
timestampStr := strings.Join(strings.Split(matches[1], " "), "T")
if newEntry.Timestamp.IsZero() {
timeParsed, err := parseRFC3339WithOptionalZ(timestampStr)
if err != nil {
slog.Error("cant parse time string", "error", err)
}
newEntry.Timestamp = timeParsed
}
baseInfo := TCCBaseInfo{
ProcessID: matches[3],
ThreadID: strings.TrimSpace(matches[4]),
LoggerName: matches[5],
}
newEntry.LogLevel = matches[2]
newEntry.LogMessage = matches[6]
newEntry.BaseInformation = baseInfo
return newEntry
}
func parseNginxService(entry LogEntry) LogEntry {
newEntry := entry
matches := nginxAccessPattern.FindStringSubmatch(strings.TrimSpace(entry.LogMessage))
if len(matches) < 7 {
return newEntry
}
statusCode, err := strconv.ParseInt(matches[5], 10, 64)
if err != nil {
slog.Error("cant parse statuscode", "error", err)
}
bytesSend, err := strconv.ParseInt(matches[6], 10, 64)
if err != nil {
slog.Error("cant parse bytessend", "error", err)
}
baseInfo := NGinXBaseInfo{
ClientIP: matches[1],
RemoteUser: matches[2],
Request: matches[4],
StatusCode: int(statusCode),
BytesSend: int(bytesSend),
}
if len(matches) > 7 && matches[7] != "" {
baseInfo.Referer = matches[7]
}
if len(matches) > 8 && matches[8] != "" {
baseInfo.UserAgent = matches[8]
}
if requestParts := strings.Fields(matches[4]); len(requestParts) >= 3 {
baseInfo.HTTPMethod = requestParts[0]
baseInfo.RequestURI = requestParts[1]
baseInfo.HTTPVersion = requestParts[2]
}
return newEntry
}
func parseRFC3339WithOptionalZ(timeStr string) (time.Time, error) {
if !strings.HasSuffix(timeStr, "Z") && !strings.ContainsAny(timeStr[len(timeStr)-6:], "+-") {
timeStr += "Z"
}
return time.Parse(time.RFC3339Nano, timeStr)
}