525 lines
16 KiB
Go
525 lines
16 KiB
Go
package main
|
|
|
|
import (
|
|
"bufio"
|
|
"context"
|
|
"encoding/json"
|
|
"fmt"
|
|
"log/slog"
|
|
"os/exec"
|
|
"regexp"
|
|
"strconv"
|
|
"strings"
|
|
"time"
|
|
)
|
|
|
|
type ServiceMonitor struct {
|
|
config ServiceConfig
|
|
}
|
|
|
|
func NewServiceMonitor(config ServiceConfig) *ServiceMonitor {
|
|
return &ServiceMonitor{
|
|
config: config,
|
|
}
|
|
}
|
|
|
|
func (sm *ServiceMonitor) Start(ctx context.Context, out chan<- LogEntry) error {
|
|
args := sm.buildJournalctlArgs()
|
|
|
|
slog.Info("starting journalctl", "arguments", args)
|
|
|
|
cmd := exec.CommandContext(ctx, args[0], args[1:]...)
|
|
|
|
stdout, err := cmd.StdoutPipe()
|
|
if err != nil {
|
|
return fmt.Errorf("error StdoutPipe: %w", err)
|
|
}
|
|
|
|
if err := cmd.Start(); err != nil {
|
|
return fmt.Errorf("error start command: %w", err)
|
|
}
|
|
|
|
scanner := bufio.NewScanner(stdout)
|
|
|
|
go func() {
|
|
<-ctx.Done()
|
|
if cmd.Process != nil {
|
|
cmd.Process.Kill()
|
|
}
|
|
}()
|
|
|
|
parser := NewJournalEntryParser(sm.config.Name, sm.config.Service)
|
|
|
|
for scanner.Scan() {
|
|
select {
|
|
case <-ctx.Done():
|
|
return nil
|
|
default:
|
|
}
|
|
|
|
line := scanner.Text()
|
|
if strings.TrimSpace(line) == "" {
|
|
continue
|
|
}
|
|
|
|
entry, err := parser.Parse(line)
|
|
if err != nil {
|
|
slog.Error("error parsing journal entry", "service", sm.config.Name, "error", err)
|
|
continue
|
|
}
|
|
|
|
select {
|
|
case out <- entry:
|
|
case <-ctx.Done():
|
|
return nil
|
|
default:
|
|
slog.Warn("Service-Log-Channel is full, entry dropped", "service", sm.config.Name)
|
|
}
|
|
}
|
|
|
|
if err := scanner.Err(); err != nil {
|
|
return fmt.Errorf("scanner error: %w", err)
|
|
}
|
|
|
|
return cmd.Wait()
|
|
}
|
|
|
|
func (sm *ServiceMonitor) buildJournalctlArgs() []string {
|
|
args := []string{
|
|
"sudo",
|
|
"journalctl",
|
|
"-f",
|
|
"-o", "json",
|
|
"-u", sm.config.Service,
|
|
}
|
|
|
|
if sm.config.SinceTime != "" {
|
|
args = append(args, "--since", sm.config.SinceTime)
|
|
}
|
|
|
|
if sm.config.Priority != "" {
|
|
args = append(args, "-p", sm.config.Priority)
|
|
}
|
|
|
|
return args
|
|
}
|
|
|
|
type JournalEntryParser struct {
|
|
serviceName string
|
|
unitName string
|
|
}
|
|
|
|
func NewJournalEntryParser(serviceName, unitName string) *JournalEntryParser {
|
|
return &JournalEntryParser{
|
|
serviceName: serviceName,
|
|
unitName: unitName,
|
|
}
|
|
}
|
|
|
|
func (jep *JournalEntryParser) Parse(jsonLine string) (LogEntry, error) {
|
|
var journalData map[string]any
|
|
if err := json.Unmarshal([]byte(jsonLine), &journalData); err != nil {
|
|
return LogEntry{}, fmt.Errorf("JSON unmarshal error: %w", err)
|
|
}
|
|
|
|
entry := NewLogEntry("service_log")
|
|
entry.Service = jep.serviceName
|
|
entry.Unit = jep.unitName
|
|
|
|
if tsStr, ok := journalData["__REALTIME_TIMESTAMP"].(string); ok {
|
|
if tsInt, err := strconv.ParseInt(tsStr, 10, 64); err == nil {
|
|
entry.Timestamp = time.Unix(0, tsInt*1000)
|
|
}
|
|
}
|
|
if entry.Timestamp.IsZero() {
|
|
entry.Timestamp = time.Now()
|
|
}
|
|
|
|
if msg, ok := journalData["MESSAGE"].(string); ok {
|
|
entry.Message = msg
|
|
}
|
|
|
|
if priority, ok := journalData["PRIORITY"].(string); ok {
|
|
entry.Priority = priority
|
|
entry.Fields["priority_name"] = jep.getPriorityName(priority)
|
|
}
|
|
|
|
if pidStr, ok := journalData["_PID"].(string); ok {
|
|
if pid, err := strconv.Atoi(pidStr); err == nil {
|
|
entry.PID = pid
|
|
}
|
|
}
|
|
|
|
jep.extractSystemdFields(journalData, &entry)
|
|
|
|
if bootID, ok := journalData["_BOOT_ID"].(string); ok {
|
|
entry.BootID = bootID
|
|
}
|
|
if machineID, ok := journalData["_MACHINE_ID"].(string); ok {
|
|
entry.MachineID = machineID
|
|
}
|
|
|
|
entry.Raw = jsonLine
|
|
|
|
entry = jep.parseServiceSpecific(entry)
|
|
|
|
return entry, nil
|
|
}
|
|
|
|
func (jep *JournalEntryParser) getPriorityName(priority string) string {
|
|
priorityNames := map[string]string{
|
|
"0": "emergency",
|
|
"1": "alert",
|
|
"2": "critical",
|
|
"3": "error",
|
|
"4": "warning",
|
|
"5": "notice",
|
|
"6": "info",
|
|
"7": "debug",
|
|
}
|
|
|
|
if name, exists := priorityNames[priority]; exists {
|
|
return name
|
|
}
|
|
return "unknown"
|
|
}
|
|
|
|
func (jep *JournalEntryParser) extractSystemdFields(journalData map[string]any, entry *LogEntry) {
|
|
systemdFields := []string{
|
|
"_SYSTEMD_UNIT", "_SYSTEMD_USER_UNIT", "_SYSTEMD_SLICE",
|
|
"_BOOT_ID", "_MACHINE_ID", "_HOSTNAME", "_TRANSPORT",
|
|
"_CAP_EFFECTIVE", "_SELINUX_CONTEXT", "_AUDIT_SESSION",
|
|
"_AUDIT_LOGINUID", "_GID", "_UID", "_COMM", "_EXE",
|
|
"_CMDLINE", "_SYSTEMD_CGROUP", "_SYSTEMD_SESSION",
|
|
"_SYSTEMD_OWNER_UID", "_SOURCE_REALTIME_TIMESTAMP",
|
|
}
|
|
|
|
for _, field := range systemdFields {
|
|
if value, ok := journalData[field]; ok {
|
|
esFieldName := strings.ToLower(strings.TrimPrefix(field, "_"))
|
|
entry.Fields[esFieldName] = value
|
|
}
|
|
}
|
|
}
|
|
|
|
func (jep *JournalEntryParser) parseServiceSpecific(entry LogEntry) LogEntry {
|
|
switch jep.serviceName {
|
|
case "tixstream":
|
|
return parseTixstreamService(entry)
|
|
case "transfer-job-manager":
|
|
return parseTJMService(entry)
|
|
case "nginx":
|
|
return parseNginxService(entry)
|
|
case "access-manager":
|
|
return parseAMService(entry)
|
|
case "tixel-control-center":
|
|
return parseTCCService(entry)
|
|
default:
|
|
return entry
|
|
}
|
|
}
|
|
|
|
var (
|
|
amServicePattern = regexp.MustCompile(`^(\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}(?:\.\d+)?Z)\s+(\w+)\s+(\d+)\s+---\s+\[\s*([^\]]*)\]\s+([\w\.]+)\s*:\s*(.*)$`)
|
|
tccServicePattern = regexp.MustCompile(`^(\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}(?:\.\d+)?Z)\s+(\w+)\s+(\d+)\s+---\s+\[\s*([^\]]*)\]\s+([\w\.]+)\s*:\s*(.*)$`)
|
|
tjmServicePattern = regexp.MustCompile(`^(?<time>\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}\.\d{3})\s+(?<level>\S+)\s+(?<pid>\d+).*?\[(?<collatation_id>[^\]]*)\]\s+\[(?<username>[^\]]*)\]\s+\[(?<thread>[^\]]*)\]\s+(?<class>.*?)\s+:\s+(?<message>.*)`)
|
|
tjmTransferNamePattern = regexp.MustCompile(`^(\d{8}T\d{6}-[A-Za-z0-9]+-[A-Za-z]+-(?:in|out)) ?: (.*)$`)
|
|
tsServicePattern = regexp.MustCompile(`^(?<level>\S+)\s+(?<time>\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}\.\d{6})\s+(?<message>.*)`)
|
|
tsTransferIDPattern = regexp.MustCompile(`^(?<transfer>\w{8}-\w{4}-\w{4}-\w{4}-\w{12})\s+(?<message>.*)`)
|
|
nginxAccessPattern = regexp.MustCompile(`^(\S+)\s+\S+\s+(\S+)\s+\[([^\]]+)\]\s+"([^"]+)"\s+(\d+)\s+(\d+|-)\s*(?:"([^"]*)"\s+"([^"]*)")?`)
|
|
tjmInnerLogPattern = regexp.MustCompile(`^(?P<transfername>[^ ]+) : (?P<javaclass>\w+)\.(?P<methode>\w+): started transfer session (?P<transferid>[a-f0-9\-]+) on (?P<localaddr>[\d\.]+:\d+) with target address (?P<targetaddr>[a-zA-Z0-9\.\-]+:\d+)`)
|
|
)
|
|
|
|
func parseTixstreamService(entry LogEntry) LogEntry {
|
|
newEntry := entry
|
|
fields := make(map[string]any)
|
|
matches := tsServicePattern.FindStringSubmatch(newEntry.Message)
|
|
if len(matches) > 0 {
|
|
timestamp := strings.Join(strings.Split(matches[2], " "), "T")
|
|
fields["log_level"] = strings.TrimSpace(matches[1])
|
|
fields["message_timestamp"] = timestamp
|
|
fields["log_message"] = strings.TrimSpace(matches[3])
|
|
} else {
|
|
fields["log_message"] = newEntry.Message
|
|
}
|
|
trNameMatch := tsTransferIDPattern.FindStringSubmatch(fields["log_message"].(string))
|
|
if len(trNameMatch) > 0 {
|
|
fields["transfer_id"] = trNameMatch[1]
|
|
fields["log_message"] = trNameMatch[2]
|
|
split := strings.Fields(trNameMatch[2])
|
|
switch split[0] {
|
|
case "in:":
|
|
fields["transfer_direction"] = "incoming"
|
|
case "out:":
|
|
fields["transfer_direction"] = "outgoing"
|
|
}
|
|
}
|
|
|
|
msg := strings.ReplaceAll(newEntry.Message, " ", " ")
|
|
parts := strings.Fields(msg)
|
|
|
|
if len(parts) < 5 {
|
|
return newEntry
|
|
}
|
|
|
|
newEntry.Fields = fields
|
|
return newEntry
|
|
}
|
|
|
|
// func parseTixstreamService(entry LogEntry) LogEntry {
|
|
// newEntry := entry
|
|
// msg := strings.ReplaceAll(entry.Message, " ", " ")
|
|
// parts := strings.Fields(msg)
|
|
// if len(parts) < 5 {
|
|
// return newEntry
|
|
// }
|
|
//
|
|
// info := parts[4:]
|
|
//
|
|
// if newEntry.Fields == nil {
|
|
// newEntry.Fields = make(map[string]any)
|
|
// }
|
|
// newEntry.Fields["log_level"] = parts[0]
|
|
// newEntry.Fields["message_date"] = parts[1]
|
|
// newEntry.Fields["message_time"] = parts[2]
|
|
// newEntry.Fields["transfer_id"] = parts[3]
|
|
// newEntry.Fields["log_message"] = strings.Join(info, " ")
|
|
//
|
|
// switch info[0] {
|
|
// case "in:":
|
|
// newEntry.Fields["log_type"] = "log_message"
|
|
// newEntry.Fields["transfer_direction"] = "incoming"
|
|
// newEntry.Fields["transfer_info"] = strings.Join(info[1:], " ")
|
|
// case "out:":
|
|
// newEntry.Fields["log_type"] = "log_message"
|
|
// newEntry.Fields["transfer_direction"] = "outgoing"
|
|
// newEntry.Fields["transfer_info"] = strings.Join(info[1:], " ")
|
|
// case "queue-stats:":
|
|
// newEntry.Fields["log_type"] = "queue-stats"
|
|
// newEntry.Fields["queue-stats"] = strings.Join(info[1:], " ")
|
|
// case "transfer:":
|
|
// newEntry.Fields["log_type"] = "transfer_info"
|
|
// newEntry.Fields["transfer_info"] = strings.Join(info[1:], " ")
|
|
// default:
|
|
// newEntry.Fields["log_type"] = "log_message"
|
|
// newEntry.Fields["transfer_info"] = strings.Join(info, " ")
|
|
// }
|
|
//
|
|
// return newEntry
|
|
// }
|
|
|
|
func parseTJMService(entry LogEntry) LogEntry {
|
|
newEntry := entry
|
|
logContent := entry.Message
|
|
msg := strings.TrimSpace(logContent)
|
|
msg = strings.ReplaceAll(msg, " ", " ")
|
|
msg = strings.ReplaceAll(msg, "---", "")
|
|
msg = strings.ReplaceAll(msg, " ", " ")
|
|
parts := strings.Fields(msg)
|
|
if len(parts) < 4 {
|
|
return newEntry
|
|
}
|
|
fields := make(map[string]any)
|
|
matches := tjmServicePattern.FindStringSubmatch(logContent)
|
|
if len(matches) > 0 {
|
|
timestamp := strings.Join(strings.Split(matches[1], " "), "T")
|
|
fields["message_timestamp"] = timestamp
|
|
fields["log_level"] = strings.TrimSpace(matches[2])
|
|
fields["pid"] = strings.TrimSpace(matches[3])
|
|
fields["correlation_id"] = strings.TrimSpace(matches[4])
|
|
fields["username"] = strings.TrimSpace(matches[5])
|
|
fields["thread_id"] = strings.TrimSpace(matches[6])
|
|
fields["java_class"] = strings.TrimSpace(matches[7])
|
|
fields["log_message"] = strings.TrimSpace(matches[8])
|
|
} else {
|
|
fields["log_message"] = logContent
|
|
}
|
|
trNameMatch := tjmTransferNamePattern.FindStringSubmatch(fields["log_message"].(string))
|
|
if len(trNameMatch) > 0 {
|
|
fields["transfer_name"] = trNameMatch[1]
|
|
fields["log_message"] = trNameMatch[2]
|
|
if strings.Contains(trNameMatch[1], "-in") {
|
|
fields["transfer_direction"] = "incoming"
|
|
}
|
|
if strings.Contains(trNameMatch[1], "-out") {
|
|
fields["transfer_direction"] = "outgoing"
|
|
}
|
|
}
|
|
value, ok := newEntry.Fields["log_message"]
|
|
if ok {
|
|
matches := tjmInnerLogPattern.FindStringSubmatch(value.(string))
|
|
groups := tjmInnerLogPattern.SubexpNames()
|
|
if len(matches) >= 1 {
|
|
for i, name := range groups {
|
|
if i != 0 && name != "" {
|
|
newEntry.Fields[name] = matches[i]
|
|
}
|
|
}
|
|
} else {
|
|
if strings.Contains(value.(string), "TransferJobTixstreamFile") || strings.Contains(value.(string), "PeerJobController.handlePeerJobAction") {
|
|
if strings.Contains(value.(string), "started transfer session") {
|
|
tmpSplit := strings.Split(value.(string), " ")
|
|
newEntry.Fields["transfer_name"] = tmpSplit[0]
|
|
newEntry.Fields["java_class_method"] = tmpSplit[2]
|
|
transferID, _ := strings.CutPrefix(strings.Join(tmpSplit[3:], " "), "started transfer session")
|
|
newEntry.Fields["transfer-id"] = strings.Split(transferID, " ")[0]
|
|
} else if strings.Contains(value.(string), "set transfer session id") {
|
|
tmpSplit := strings.Split(value.(string), " ")
|
|
newEntry.Fields["java_class_method"] = tmpSplit[0]
|
|
transferID := tmpSplit[len(tmpSplit)-1]
|
|
newEntry.Fields["transfer-id"] = transferID
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
newEntry.Fields = fields
|
|
|
|
return newEntry
|
|
}
|
|
|
|
// func parseTJMService(entry LogEntry) LogEntry {
|
|
// newEntry := entry
|
|
// msg := strings.TrimSpace(entry.Message)
|
|
// msg = strings.ReplaceAll(msg, " ", " ")
|
|
// msg = strings.ReplaceAll(msg, "---", "")
|
|
// msg = strings.ReplaceAll(msg, " ", " ")
|
|
// parts := strings.Fields(msg)
|
|
// if len(parts) < 4 {
|
|
// return newEntry
|
|
// }
|
|
//
|
|
// info := parts[3:]
|
|
//
|
|
// if newEntry.Fields == nil {
|
|
// newEntry.Fields = make(map[string]any)
|
|
// }
|
|
// newEntry.Fields["log_level"] = parts[2]
|
|
// newEntry.Fields["message_date"] = parts[0]
|
|
// newEntry.Fields["message_time"] = parts[1]
|
|
// newEntry.Fields["message"] = strings.Join(info, " ")
|
|
//
|
|
// tmpInfo := strings.ReplaceAll(strings.Join(info, " "), "[ ]", "[]")
|
|
// tmpInfo = strings.ReplaceAll(tmpInfo, "[ ", "[")
|
|
// tmpSplit := strings.Fields(tmpInfo)
|
|
// if len(tmpSplit) > 4 {
|
|
// newEntry.Fields["username"] = tmpSplit[2]
|
|
// newEntry.Fields["correlation_id"] = tmpSplit[1]
|
|
// newEntry.Fields["thread_id"] = tmpSplit[3]
|
|
// newEntry.Fields["java_class"] = tmpSplit[4]
|
|
// newEntry.Fields["log_type"] = "log_message"
|
|
// if len(tmpSplit) > 6 && strings.Contains(tmpSplit[6], "-out") {
|
|
// newEntry.Fields["transfer_direction"] = "outgoing"
|
|
// newEntry.Fields["log_message"] = strings.Join(tmpSplit[7:], " ")
|
|
// } else if len(tmpSplit) > 6 && strings.Contains(tmpSplit[6], "-in") {
|
|
// newEntry.Fields["transfer_direction"] = "incoming"
|
|
// newEntry.Fields["log_message"] = strings.Join(tmpSplit[7:], " ")
|
|
// } else {
|
|
// newEntry.Fields["log_message"] = strings.Join(tmpSplit[6:], " ")
|
|
// }
|
|
// }
|
|
// value, ok := newEntry.Fields["log_message"]
|
|
// if ok {
|
|
// matches := tjmInnerLogPattern.FindStringSubmatch(value.(string))
|
|
// groups := tjmInnerLogPattern.SubexpNames()
|
|
// if len(matches) >= 1 {
|
|
// for i, name := range groups {
|
|
// if i != 0 && name != "" {
|
|
// newEntry.Fields[name] = matches[i]
|
|
// }
|
|
// }
|
|
// } else {
|
|
// if strings.Contains(value.(string), "TransferJobTixstreamFile") || strings.Contains(value.(string), "PeerJobController.handlePeerJobAction") {
|
|
// if strings.Contains(value.(string), "started transfer session") {
|
|
// tmpSplit := strings.Split(value.(string), " ")
|
|
// newEntry.Fields["transfer_name"] = tmpSplit[0]
|
|
// newEntry.Fields["java_class_method"] = tmpSplit[2]
|
|
// transferID, _ := strings.CutPrefix(strings.Join(tmpSplit[3:], " "), "started transfer session")
|
|
// newEntry.Fields["transfer-id"] = strings.Split(transferID, " ")[0]
|
|
// } else if strings.Contains(value.(string), "set transfer session id") {
|
|
// tmpSplit := strings.Split(value.(string), " ")
|
|
// newEntry.Fields["java_class_method"] = tmpSplit[0]
|
|
// transferID := tmpSplit[len(tmpSplit)-1]
|
|
// newEntry.Fields["transfer-id"] = transferID
|
|
// }
|
|
// }
|
|
// }
|
|
// }
|
|
//
|
|
// return newEntry
|
|
// }
|
|
|
|
func parseAMService(entry LogEntry) LogEntry {
|
|
newEntry := entry
|
|
if newEntry.Fields == nil {
|
|
newEntry.Fields = make(map[string]any)
|
|
}
|
|
|
|
matches := amServicePattern.FindStringSubmatch(strings.TrimSpace(entry.Message))
|
|
if len(matches) != 7 {
|
|
return newEntry
|
|
}
|
|
|
|
newEntry.Fields["log_level"] = matches[2]
|
|
newEntry.Fields["process_id"] = matches[3]
|
|
newEntry.Fields["thread_name"] = strings.TrimSpace(matches[4])
|
|
newEntry.Fields["logger_name"] = matches[5]
|
|
newEntry.Fields["log_message"] = matches[6]
|
|
|
|
return newEntry
|
|
}
|
|
|
|
func parseTCCService(entry LogEntry) LogEntry {
|
|
newEntry := entry
|
|
if newEntry.Fields == nil {
|
|
newEntry.Fields = make(map[string]any)
|
|
}
|
|
|
|
matches := tccServicePattern.FindStringSubmatch(strings.TrimSpace(entry.Message))
|
|
if len(matches) != 7 {
|
|
return newEntry
|
|
}
|
|
|
|
newEntry.Fields["timestamp"] = matches[1]
|
|
newEntry.Fields["log_level"] = matches[2]
|
|
newEntry.Fields["process_id"] = matches[3]
|
|
newEntry.Fields["thread_name"] = strings.TrimSpace(matches[4])
|
|
newEntry.Fields["logger_name"] = matches[5]
|
|
newEntry.Fields["log_message"] = matches[6]
|
|
|
|
return newEntry
|
|
}
|
|
|
|
func parseNginxService(entry LogEntry) LogEntry {
|
|
newEntry := entry
|
|
if newEntry.Fields == nil {
|
|
newEntry.Fields = make(map[string]any)
|
|
}
|
|
|
|
matches := nginxAccessPattern.FindStringSubmatch(strings.TrimSpace(entry.Message))
|
|
if len(matches) < 7 {
|
|
return newEntry
|
|
}
|
|
|
|
newEntry.Fields["client_ip"] = matches[1]
|
|
newEntry.Fields["remote_user"] = matches[2]
|
|
newEntry.Fields["timestamp"] = matches[3]
|
|
newEntry.Fields["request"] = matches[4]
|
|
newEntry.Fields["status_code"] = matches[5]
|
|
newEntry.Fields["bytes_sent"] = matches[6]
|
|
|
|
if len(matches) > 7 && matches[7] != "" {
|
|
newEntry.Fields["referer"] = matches[7]
|
|
}
|
|
if len(matches) > 8 && matches[8] != "" {
|
|
newEntry.Fields["user_agent"] = matches[8]
|
|
}
|
|
|
|
if requestParts := strings.Fields(matches[4]); len(requestParts) >= 3 {
|
|
newEntry.Fields["http_method"] = requestParts[0]
|
|
newEntry.Fields["request_uri"] = requestParts[1]
|
|
newEntry.Fields["http_version"] = requestParts[2]
|
|
}
|
|
|
|
return newEntry
|
|
}
|