watch-tool/log_processor.go

83 lines
1.6 KiB
Go

package main
import (
"context"
"log/slog"
"time"
"github.com/elastic/go-elasticsearch/v8"
)
type LogProcessor struct {
sender ElasticsearchSender
baseIndex string
batchSize int
}
func NewLogProcessor(es *elasticsearch.Client, baseIndex string) *LogProcessor {
return &LogProcessor{
sender: NewElasticsearchSender(es),
baseIndex: baseIndex,
batchSize: 100,
}
}
func (lp *LogProcessor) Start(ctx context.Context, logChan <-chan LogEntry) {
batch := make([]LogEntry, 0, lp.batchSize)
ticker := time.NewTicker(5 * time.Second)
defer ticker.Stop()
slog.Info("Log processor started", "batch_size", lp.batchSize)
for {
select {
case <-ctx.Done():
if len(batch) > 0 {
lp.sendBatch(batch)
}
slog.Info("Log processor stopped")
return
case entry, ok := <-logChan:
if !ok {
if len(batch) > 0 {
lp.sendBatch(batch)
}
return
}
batch = append(batch, entry)
if len(batch) >= lp.batchSize {
lp.sendBatch(batch)
batch = batch[:0]
}
case <-ticker.C:
if len(batch) > 0 {
lp.sendBatch(batch)
batch = batch[:0]
}
}
}
}
func (lp *LogProcessor) sendBatch(batch []LogEntry) {
if len(batch) == 0 {
return
}
if err := lp.sender.SendBatch(lp.baseIndex, batch); err != nil {
slog.Error("error sending log batch", "error", err, "batch_size", len(batch))
return
}
slog.Debug("Log batch sent successfully", "batch_size", len(batch))
}
func (lp *LogProcessor) SetBatchSize(size int) {
if size > 0 {
lp.batchSize = size
slog.Info("Log processor batch size changed", "new_size", size)
}
}