guenther/internal/collector/log.go

250 lines
6.6 KiB
Go
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

package collector
import (
"bufio"
"context"
"fmt"
"io"
"log"
"os"
"strings"
"sync"
"sync/atomic"
"time"
drain3go "codeberg.org/pata1704/drain3"
"codeberg.org/pata1704/guenther/internal/config"
idrain3 "codeberg.org/pata1704/guenther/internal/drain3"
"codeberg.org/pata1704/guenther/pkg/types"
"github.com/fsnotify/fsnotify"
)
// linePool recycles *strings.Builder instances used in the line-read hot path
// to reduce allocations when processing high-volume log files.
var linePool = sync.Pool{
New: func() any { return new(strings.Builder) },
}
// LogCollector tails a log file using inotify (fsnotify) and emits a
// types.LogEvent for every non-empty line.
//
// Processing pipeline per line:
// 1. ApplyMasking extracts named parameters and masks the line.
// 2. Drain3.Parse mines a template ID from the masked line.
// 3. Severity classified from the raw line.
// 4. Emit non-blocking channel send with drop counter.
//
// The collector uses a single goroutine per file and a WaitGroup for clean
// shutdown.
type LogCollector struct {
cfg *config.Config
miner *drain3go.TemplateMiner
outputChan chan<- types.LogEvent
healthChan chan<- types.StageHealth
wg sync.WaitGroup
processed atomic.Uint64
dropped atomic.Uint64
}
// NewLogCollector creates a LogCollector wired to the provided channels.
// Drain3 is initialised with an in-memory persistence store; the template
// tree is rebuilt from scratch on restart (state persistence can be added
// via FilePersistence if needed).
func NewLogCollector(
cfg *config.Config,
output chan<- types.LogEvent,
health chan<- types.StageHealth,
) *LogCollector {
dc := drain3go.DefaultConfig()
dc.SimTh = cfg.Drain.SimThreshold
dc.Depth = cfg.Drain.Depth
dc.MaxChildren = cfg.Drain.MaxChildren
miner := drain3go.NewTemplateMiner(dc, drain3go.NewMemoryPersistence())
return &LogCollector{
cfg: cfg,
miner: miner,
outputChan: output,
healthChan: health,
}
}
// Start begins tailing cfg.Ingestion.LogPath.
// The method returns an error if the file cannot be opened or if the
// inotify watcher cannot be created. Subsequent errors during tailing are
// logged but do not propagate.
func (c *LogCollector) Start(ctx context.Context) error {
f, err := os.Open(c.cfg.Ingestion.LogPath)
if err != nil {
return fmt.Errorf("log collector: open %q: %w", c.cfg.Ingestion.LogPath, err)
}
// Seek to end: only tail new content, not existing content.
if _, err := f.Seek(0, io.SeekEnd); err != nil {
f.Close()
return fmt.Errorf("log collector: seek %q: %w", c.cfg.Ingestion.LogPath, err)
}
watcher, err := fsnotify.NewWatcher()
if err != nil {
f.Close()
return fmt.Errorf("log collector: create fsnotify watcher: %w", err)
}
if err := watcher.Add(c.cfg.Ingestion.LogPath); err != nil {
f.Close()
watcher.Close()
return fmt.Errorf("log collector: watch %q: %w", c.cfg.Ingestion.LogPath, err)
}
reader := bufio.NewReaderSize(f, 64*1024)
reportTicker := time.NewTicker(5 * time.Second)
c.wg.Go(func() {
defer f.Close()
defer watcher.Close()
defer reportTicker.Stop()
for {
select {
case event, ok := <-watcher.Events:
if !ok {
return
}
if event.Has(fsnotify.Write) {
c.drainReader(reader)
}
if event.Has(fsnotify.Remove) || event.Has(fsnotify.Rename) {
// Log rotation: reopen the file.
log.Printf("log collector: file %q rotated reopening", c.cfg.Ingestion.LogPath)
f.Close()
newF, err := c.reopenFile()
if err != nil {
log.Printf("log collector: reopen after rotation: %v", err)
return
}
f = newF
reader = bufio.NewReaderSize(f, 64*1024)
if err := watcher.Add(c.cfg.Ingestion.LogPath); err != nil {
log.Printf("log collector: re-watch after rotation: %v", err)
}
}
case err, ok := <-watcher.Errors:
if !ok {
return
}
log.Printf("log collector: watcher error: %v", err)
case <-reportTicker.C:
c.emitHealth()
case <-ctx.Done():
return
}
}
})
return nil
}
// Wait waits for the collector goroutine to exit after context cancellation.
func (c *LogCollector) Wait() {
c.wg.Wait()
}
// drainReader reads all complete lines currently available in reader and
// processes each one. Partial lines (no trailing newline) are left in the
// bufio buffer for the next Write event.
func (c *LogCollector) drainReader(r *bufio.Reader) {
for {
line, err := r.ReadString('\n')
if len(line) > 0 {
c.processLine(strings.TrimRight(line, "\r\n"))
}
if err != nil {
// io.EOF means no more complete lines; any other error is logged.
if err != io.EOF {
log.Printf("log collector: read error: %v", err)
}
return
}
}
}
// processLine applies masking, mines a Drain3 template, classifies severity,
// and emits a LogEvent. The send is non-blocking; full channels increment the
// dropped counter if the pipeline is backlogged.
func (c *LogCollector) processLine(line string) {
if line == "" {
return
}
// Phase 1+2: masking and parameter extraction.
masked, params := idrain3.ApplyMasking(line, c.cfg.Drain.MaskingPatterns)
// Phase 3: template mining on the masked line.
result := c.miner.AddLogMessage(masked)
if result == nil {
return
}
event := types.LogEvent{
Timestamp: time.Now(),
TemplateID: result.ClusterID,
Params: params,
Severity: classifySeverity(line),
RawLine: line,
}
select {
case c.outputChan <- event:
c.processed.Add(1)
default:
c.dropped.Add(1)
}
}
// reopenFile opens cfg.Ingestion.LogPath after log rotation, seeking to the
// beginning of the new file.
func (c *LogCollector) reopenFile() (*os.File, error) {
f, err := os.Open(c.cfg.Ingestion.LogPath)
if err != nil {
return nil, fmt.Errorf("open: %w", err)
}
return f, nil
}
// emitHealth sends a StageHealth snapshot; non-blocking (drops if full).
func (c *LogCollector) emitHealth() {
p := c.processed.Load()
d := c.dropped.Load()
select {
case c.healthChan <- types.StageHealth{
StageName: "log_collector",
EventsProcessed: p,
EventsDropped: d,
Throughput: float64(p) / 5.0,
LastUpdate: time.Now(),
}:
default:
}
}
// classifySeverity extracts the severity level from a raw log line by
// scanning for well-known keywords (case-insensitive).
func classifySeverity(line string) string {
upper := strings.ToUpper(line)
switch {
case strings.Contains(upper, "ERROR") || strings.Contains(upper, "FATAL") || strings.Contains(upper, "CRITICAL") || strings.Contains(upper, "ERR"):
return "ERROR"
case strings.Contains(upper, "WARN") || strings.Contains(upper, "WARNING"):
return "WARN"
case strings.Contains(upper, "DEBUG"):
return "DEBUG"
default:
return "INFO"
}
}