250 lines
6.6 KiB
Go
250 lines
6.6 KiB
Go
package collector
|
||
|
||
import (
|
||
"bufio"
|
||
"context"
|
||
"fmt"
|
||
"io"
|
||
"log"
|
||
"os"
|
||
"strings"
|
||
"sync"
|
||
"sync/atomic"
|
||
"time"
|
||
|
||
drain3go "codeberg.org/pata1704/drain3"
|
||
"codeberg.org/pata1704/guenther/internal/config"
|
||
idrain3 "codeberg.org/pata1704/guenther/internal/drain3"
|
||
"codeberg.org/pata1704/guenther/pkg/types"
|
||
"github.com/fsnotify/fsnotify"
|
||
)
|
||
|
||
// linePool recycles *strings.Builder instances used in the line-read hot path
|
||
// to reduce allocations when processing high-volume log files.
|
||
var linePool = sync.Pool{
|
||
New: func() any { return new(strings.Builder) },
|
||
}
|
||
|
||
// LogCollector tails a log file using inotify (fsnotify) and emits a
|
||
// types.LogEvent for every non-empty line.
|
||
//
|
||
// Processing pipeline per line:
|
||
// 1. ApplyMasking – extracts named parameters and masks the line.
|
||
// 2. Drain3.Parse – mines a template ID from the masked line.
|
||
// 3. Severity – classified from the raw line.
|
||
// 4. Emit – non-blocking channel send with drop counter.
|
||
//
|
||
// The collector uses a single goroutine per file and a WaitGroup for clean
|
||
// shutdown.
|
||
type LogCollector struct {
|
||
cfg *config.Config
|
||
miner *drain3go.TemplateMiner
|
||
outputChan chan<- types.LogEvent
|
||
healthChan chan<- types.StageHealth
|
||
|
||
wg sync.WaitGroup
|
||
|
||
processed atomic.Uint64
|
||
dropped atomic.Uint64
|
||
}
|
||
|
||
// NewLogCollector creates a LogCollector wired to the provided channels.
|
||
// Drain3 is initialised with an in-memory persistence store; the template
|
||
// tree is rebuilt from scratch on restart (state persistence can be added
|
||
// via FilePersistence if needed).
|
||
func NewLogCollector(
|
||
cfg *config.Config,
|
||
output chan<- types.LogEvent,
|
||
health chan<- types.StageHealth,
|
||
) *LogCollector {
|
||
dc := drain3go.DefaultConfig()
|
||
dc.SimTh = cfg.Drain.SimThreshold
|
||
dc.Depth = cfg.Drain.Depth
|
||
dc.MaxChildren = cfg.Drain.MaxChildren
|
||
|
||
miner := drain3go.NewTemplateMiner(dc, drain3go.NewMemoryPersistence())
|
||
|
||
return &LogCollector{
|
||
cfg: cfg,
|
||
miner: miner,
|
||
outputChan: output,
|
||
healthChan: health,
|
||
}
|
||
}
|
||
|
||
// Start begins tailing cfg.Ingestion.LogPath.
|
||
// The method returns an error if the file cannot be opened or if the
|
||
// inotify watcher cannot be created. Subsequent errors during tailing are
|
||
// logged but do not propagate.
|
||
func (c *LogCollector) Start(ctx context.Context) error {
|
||
f, err := os.Open(c.cfg.Ingestion.LogPath)
|
||
if err != nil {
|
||
return fmt.Errorf("log collector: open %q: %w", c.cfg.Ingestion.LogPath, err)
|
||
}
|
||
|
||
// Seek to end: only tail new content, not existing content.
|
||
if _, err := f.Seek(0, io.SeekEnd); err != nil {
|
||
f.Close()
|
||
return fmt.Errorf("log collector: seek %q: %w", c.cfg.Ingestion.LogPath, err)
|
||
}
|
||
|
||
watcher, err := fsnotify.NewWatcher()
|
||
if err != nil {
|
||
f.Close()
|
||
return fmt.Errorf("log collector: create fsnotify watcher: %w", err)
|
||
}
|
||
if err := watcher.Add(c.cfg.Ingestion.LogPath); err != nil {
|
||
f.Close()
|
||
watcher.Close()
|
||
return fmt.Errorf("log collector: watch %q: %w", c.cfg.Ingestion.LogPath, err)
|
||
}
|
||
|
||
reader := bufio.NewReaderSize(f, 64*1024)
|
||
reportTicker := time.NewTicker(5 * time.Second)
|
||
|
||
c.wg.Go(func() {
|
||
defer f.Close()
|
||
defer watcher.Close()
|
||
defer reportTicker.Stop()
|
||
|
||
for {
|
||
select {
|
||
case event, ok := <-watcher.Events:
|
||
if !ok {
|
||
return
|
||
}
|
||
if event.Has(fsnotify.Write) {
|
||
c.drainReader(reader)
|
||
}
|
||
if event.Has(fsnotify.Remove) || event.Has(fsnotify.Rename) {
|
||
// Log rotation: reopen the file.
|
||
log.Printf("log collector: file %q rotated – reopening", c.cfg.Ingestion.LogPath)
|
||
f.Close()
|
||
newF, err := c.reopenFile()
|
||
if err != nil {
|
||
log.Printf("log collector: reopen after rotation: %v", err)
|
||
return
|
||
}
|
||
f = newF
|
||
reader = bufio.NewReaderSize(f, 64*1024)
|
||
if err := watcher.Add(c.cfg.Ingestion.LogPath); err != nil {
|
||
log.Printf("log collector: re-watch after rotation: %v", err)
|
||
}
|
||
}
|
||
|
||
case err, ok := <-watcher.Errors:
|
||
if !ok {
|
||
return
|
||
}
|
||
log.Printf("log collector: watcher error: %v", err)
|
||
|
||
case <-reportTicker.C:
|
||
c.emitHealth()
|
||
|
||
case <-ctx.Done():
|
||
return
|
||
}
|
||
}
|
||
})
|
||
|
||
return nil
|
||
}
|
||
|
||
// Wait waits for the collector goroutine to exit after context cancellation.
|
||
func (c *LogCollector) Wait() {
|
||
c.wg.Wait()
|
||
}
|
||
|
||
// drainReader reads all complete lines currently available in reader and
|
||
// processes each one. Partial lines (no trailing newline) are left in the
|
||
// bufio buffer for the next Write event.
|
||
func (c *LogCollector) drainReader(r *bufio.Reader) {
|
||
for {
|
||
line, err := r.ReadString('\n')
|
||
if len(line) > 0 {
|
||
c.processLine(strings.TrimRight(line, "\r\n"))
|
||
}
|
||
if err != nil {
|
||
// io.EOF means no more complete lines; any other error is logged.
|
||
if err != io.EOF {
|
||
log.Printf("log collector: read error: %v", err)
|
||
}
|
||
return
|
||
}
|
||
}
|
||
}
|
||
|
||
// processLine applies masking, mines a Drain3 template, classifies severity,
|
||
// and emits a LogEvent. The send is non-blocking; full channels increment the
|
||
// dropped counter if the pipeline is backlogged.
|
||
func (c *LogCollector) processLine(line string) {
|
||
if line == "" {
|
||
return
|
||
}
|
||
|
||
// Phase 1+2: masking and parameter extraction.
|
||
masked, params := idrain3.ApplyMasking(line, c.cfg.Drain.MaskingPatterns)
|
||
|
||
// Phase 3: template mining on the masked line.
|
||
result := c.miner.AddLogMessage(masked)
|
||
if result == nil {
|
||
return
|
||
}
|
||
|
||
event := types.LogEvent{
|
||
Timestamp: time.Now(),
|
||
TemplateID: result.ClusterID,
|
||
Params: params,
|
||
Severity: classifySeverity(line),
|
||
RawLine: line,
|
||
}
|
||
|
||
select {
|
||
case c.outputChan <- event:
|
||
c.processed.Add(1)
|
||
default:
|
||
c.dropped.Add(1)
|
||
}
|
||
}
|
||
|
||
// reopenFile opens cfg.Ingestion.LogPath after log rotation, seeking to the
|
||
// beginning of the new file.
|
||
func (c *LogCollector) reopenFile() (*os.File, error) {
|
||
f, err := os.Open(c.cfg.Ingestion.LogPath)
|
||
if err != nil {
|
||
return nil, fmt.Errorf("open: %w", err)
|
||
}
|
||
return f, nil
|
||
}
|
||
|
||
// emitHealth sends a StageHealth snapshot; non-blocking (drops if full).
|
||
func (c *LogCollector) emitHealth() {
|
||
p := c.processed.Load()
|
||
d := c.dropped.Load()
|
||
select {
|
||
case c.healthChan <- types.StageHealth{
|
||
StageName: "log_collector",
|
||
EventsProcessed: p,
|
||
EventsDropped: d,
|
||
Throughput: float64(p) / 5.0,
|
||
LastUpdate: time.Now(),
|
||
}:
|
||
default:
|
||
}
|
||
}
|
||
|
||
// classifySeverity extracts the severity level from a raw log line by
|
||
// scanning for well-known keywords (case-insensitive).
|
||
func classifySeverity(line string) string {
|
||
upper := strings.ToUpper(line)
|
||
switch {
|
||
case strings.Contains(upper, "ERROR") || strings.Contains(upper, "FATAL") || strings.Contains(upper, "CRITICAL") || strings.Contains(upper, "ERR"):
|
||
return "ERROR"
|
||
case strings.Contains(upper, "WARN") || strings.Contains(upper, "WARNING"):
|
||
return "WARN"
|
||
case strings.Contains(upper, "DEBUG"):
|
||
return "DEBUG"
|
||
default:
|
||
return "INFO"
|
||
}
|
||
}
|