80 lines
1.5 KiB
Go
80 lines
1.5 KiB
Go
package main
|
|
|
|
import (
|
|
"context"
|
|
"log/slog"
|
|
"time"
|
|
"tixel_watch/models"
|
|
)
|
|
|
|
type LogProcessor struct {
|
|
storage StorageInterface
|
|
batchSize int
|
|
}
|
|
|
|
func NewLogProcessor(storage StorageInterface) *LogProcessor {
|
|
return &LogProcessor{
|
|
storage: storage,
|
|
batchSize: 100,
|
|
}
|
|
}
|
|
|
|
func (lp *LogProcessor) Start(ctx context.Context, logChan <-chan models.LogMessage) {
|
|
batch := make([]models.LogMessage, 0, lp.batchSize)
|
|
ticker := time.NewTicker(5 * time.Second)
|
|
defer ticker.Stop()
|
|
|
|
slog.Info("Log processor started", "batch_size", lp.batchSize)
|
|
|
|
for {
|
|
select {
|
|
case <-ctx.Done():
|
|
if len(batch) > 0 {
|
|
lp.storeBatch(ctx, batch)
|
|
}
|
|
slog.Info("Log processor stopped")
|
|
return
|
|
|
|
case entry, ok := <-logChan:
|
|
if !ok {
|
|
if len(batch) > 0 {
|
|
lp.storeBatch(ctx, batch)
|
|
}
|
|
return
|
|
}
|
|
|
|
batch = append(batch, entry)
|
|
|
|
if len(batch) >= lp.batchSize {
|
|
lp.storeBatch(ctx, batch)
|
|
batch = batch[:0]
|
|
}
|
|
|
|
case <-ticker.C:
|
|
if len(batch) > 0 {
|
|
lp.storeBatch(ctx, batch)
|
|
batch = batch[:0]
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
func (lp *LogProcessor) storeBatch(ctx context.Context, batch []models.LogMessage) {
|
|
if len(batch) == 0 {
|
|
return
|
|
}
|
|
|
|
if err := lp.storage.StoreBatch(ctx, batch); err != nil {
|
|
slog.Error("error storing log batch", "error", err, "batch_size", len(batch))
|
|
return
|
|
}
|
|
|
|
slog.Debug("Log batch stored successfully", "batch_size", len(batch))
|
|
}
|
|
|
|
func (lp *LogProcessor) SetBatchSize(size int) {
|
|
if size > 0 {
|
|
lp.batchSize = size
|
|
slog.Info("Log processor batch size changed", "new_size", size)
|
|
}
|
|
}
|