watch-tool/service_monitor.go

346 lines
8 KiB
Go

package main
import (
"bufio"
"context"
"encoding/json"
"fmt"
"log/slog"
"os/exec"
"strconv"
"strings"
"time"
)
type ServiceMonitor struct {
config ServiceConfig
}
func NewServiceMonitor(config ServiceConfig) *ServiceMonitor {
return &ServiceMonitor{
config: config,
}
}
func (sm *ServiceMonitor) Start(ctx context.Context, out chan<- LogEntry) error {
args := sm.buildJournalctlArgs()
slog.Info("starting journalctl", "arguments", args)
cmd := exec.CommandContext(ctx, args[0], args[1:]...)
stdout, err := cmd.StdoutPipe()
if err != nil {
return fmt.Errorf("error StdoutPipe: %w", err)
}
if err := cmd.Start(); err != nil {
return fmt.Errorf("error start command: %w", err)
}
scanner := bufio.NewScanner(stdout)
go func() {
<-ctx.Done()
if cmd.Process != nil {
cmd.Process.Kill()
}
}()
parser := NewJournalEntryParser(sm.config.Name, sm.config.Service)
for scanner.Scan() {
select {
case <-ctx.Done():
return nil
default:
}
line := scanner.Text()
if strings.TrimSpace(line) == "" {
continue
}
entry, err := parser.Parse(line)
if err != nil {
slog.Error("error parsing journal entry", "service", sm.config.Name, "error", err)
continue
}
select {
case out <- entry:
case <-ctx.Done():
return nil
default:
slog.Warn("Service-Log-Channel is full, entry dropped", "service", sm.config.Name)
}
}
if err := scanner.Err(); err != nil {
return fmt.Errorf("scanner error: %w", err)
}
return cmd.Wait()
}
func (sm *ServiceMonitor) buildJournalctlArgs() []string {
args := []string{
"sudo",
"journalctl",
"-f",
"-o", "json",
"-u", sm.config.Service,
}
if sm.config.SinceTime != "" {
args = append(args, "--since", sm.config.SinceTime)
}
if sm.config.Priority != "" {
args = append(args, "-p", sm.config.Priority)
}
return args
}
type JournalEntryParser struct {
serviceName string
unitName string
}
func NewJournalEntryParser(serviceName, unitName string) *JournalEntryParser {
return &JournalEntryParser{
serviceName: serviceName,
unitName: unitName,
}
}
func (jep *JournalEntryParser) Parse(jsonLine string) (LogEntry, error) {
var journalData map[string]any
if err := json.Unmarshal([]byte(jsonLine), &journalData); err != nil {
return LogEntry{}, fmt.Errorf("JSON unmarshal error: %w", err)
}
entry := NewLogEntry("service_log")
entry.Service = jep.serviceName
entry.Unit = jep.unitName
if tsStr, ok := journalData["__REALTIME_TIMESTAMP"].(string); ok {
if tsInt, err := strconv.ParseInt(tsStr, 10, 64); err == nil {
entry.Timestamp = time.Unix(0, tsInt*1000)
}
}
if entry.Timestamp.IsZero() {
entry.Timestamp = time.Now()
}
if msg, ok := journalData["MESSAGE"].(string); ok {
entry.Message = msg
}
if priority, ok := journalData["PRIORITY"].(string); ok {
entry.Priority = priority
entry.Fields["priority_name"] = jep.getPriorityName(priority)
}
if pidStr, ok := journalData["_PID"].(string); ok {
if pid, err := strconv.Atoi(pidStr); err == nil {
entry.PID = pid
}
}
jep.extractSystemdFields(journalData, &entry)
if bootID, ok := journalData["_BOOT_ID"].(string); ok {
entry.BootID = bootID
}
if machineID, ok := journalData["_MACHINE_ID"].(string); ok {
entry.MachineID = machineID
}
entry.Raw = jsonLine
entry = jep.parseServiceSpecific(entry)
return entry, nil
}
func (jep *JournalEntryParser) getPriorityName(priority string) string {
priorityNames := map[string]string{
"0": "emergency",
"1": "alert",
"2": "critical",
"3": "error",
"4": "warning",
"5": "notice",
"6": "info",
"7": "debug",
}
if name, exists := priorityNames[priority]; exists {
return name
}
return "unknown"
}
func (jep *JournalEntryParser) extractSystemdFields(journalData map[string]any, entry *LogEntry) {
systemdFields := []string{
"_SYSTEMD_UNIT", "_SYSTEMD_USER_UNIT", "_SYSTEMD_SLICE",
"_BOOT_ID", "_MACHINE_ID", "_HOSTNAME", "_TRANSPORT",
"_CAP_EFFECTIVE", "_SELINUX_CONTEXT", "_AUDIT_SESSION",
"_AUDIT_LOGINUID", "_GID", "_UID", "_COMM", "_EXE",
"_CMDLINE", "_SYSTEMD_CGROUP", "_SYSTEMD_SESSION",
"_SYSTEMD_OWNER_UID", "_SOURCE_REALTIME_TIMESTAMP",
}
for _, field := range systemdFields {
if value, ok := journalData[field]; ok {
esFieldName := strings.ToLower(strings.TrimPrefix(field, "_"))
entry.Fields[esFieldName] = value
}
}
}
func (jep *JournalEntryParser) parseServiceSpecific(entry LogEntry) LogEntry {
switch jep.serviceName {
case "tixstream":
return parseTixstreamService(entry)
case "transfer-job-manager":
return parseTJMService(entry)
case "nginx":
return parseNginxService(entry)
default:
return entry
}
}
func parseTixstreamService(entry LogEntry) LogEntry {
newEntry := entry
msg := strings.ReplaceAll(entry.Message, " ", " ")
parts := strings.Fields(msg)
if len(parts) < 5 {
return newEntry
}
logLevel := parts[0]
timestampDate := parts[1]
timestampTime := parts[2]
transferID := parts[3]
info := parts[4:]
if newEntry.Fields == nil {
newEntry.Fields = make(map[string]any)
}
newEntry.Fields["log_level"] = logLevel
newEntry.Fields["message_date"] = timestampDate
newEntry.Fields["message_time"] = timestampTime
newEntry.Fields["transfer_id"] = transferID
newEntry.Fields["log_message"] = strings.Join(info, " ")
if info != nil {
var transferDirection string
var transferInfo []string
var queueStats []string
var logType string
switch info[0] {
case "in:":
logType = "direction_info"
transferDirection = "incoming"
transferInfo = info[1:]
case "out:":
logType = "direction_info"
transferDirection = "outgoing"
transferInfo = info[1:]
case "queue-stats:":
logType = "queue_stats"
queueStats = info[1:]
case "transfer:":
logType = "transfer_info"
transferInfo = info[1:]
default:
logType = "log_message"
transferDirection = ""
transferInfo = info
}
if logType != "" {
newEntry.Fields["log_type"] = logType
}
if transferDirection != "" {
newEntry.Fields["transfer_direction"] = transferDirection
}
if transferInfo != nil {
newEntry.Fields["transfer_info"] = strings.Join(transferInfo, ";")
}
if queueStats != nil {
newEntry.Fields["queue_stats"] = strings.Join(queueStats, ";")
}
}
return newEntry
}
func parseTJMService(entry LogEntry) LogEntry {
newEntry := entry
msg := strings.ReplaceAll(entry.Message, " ", " ")
msg = strings.ReplaceAll(msg, "---", "")
msg = strings.ReplaceAll(msg, " ", " ")
parts := strings.Fields(msg)
if len(parts) < 4 {
return newEntry
}
timestampDate := parts[0]
timestampTime := parts[1]
logLevel := parts[2]
info := parts[3:]
if newEntry.Fields == nil {
newEntry.Fields = make(map[string]any)
}
newEntry.Fields["log_level"] = logLevel
newEntry.Fields["message_date"] = timestampDate
newEntry.Fields["message_time"] = timestampTime
newEntry.Fields["message"] = strings.Join(info, " ")
if info != nil {
tmpInfo := strings.ReplaceAll(strings.Join(info, " "), "[ ]", "[]")
tmpSplit := strings.Fields(tmpInfo)
var transferDirection string
var logMessage string
username := tmpSplit[2]
correlationID := tmpSplit[1]
threadID := tmpSplit[3]
javaClass := tmpSplit[4]
if len(tmpSplit) > 6 && strings.Contains(tmpSplit[6], "-out") {
transferDirection = "outgoing"
logMessage = strings.Join(tmpSplit[7:], " ")
} else if len(tmpSplit) > 6 && strings.Contains(tmpSplit[6], "-in") {
transferDirection = "incoming"
logMessage = strings.Join(tmpSplit[7:], " ")
} else {
logMessage = strings.Join(tmpSplit[6:], " ")
}
if username != "" && username != "[]" {
newEntry.Fields["username"] = username
}
if correlationID != "" {
newEntry.Fields["correlation_id"] = correlationID
}
if threadID != "" {
newEntry.Fields["thread_id"] = threadID
}
if javaClass != "" {
newEntry.Fields["java_class"] = javaClass
}
if transferDirection != "" {
newEntry.Fields["transfer_direction"] = transferDirection
}
newEntry.Fields["log_message"] = logMessage
}
return newEntry
}
func parseNginxService(entry LogEntry) LogEntry {
return entry
}