watch-tool/export_manager.go
2025-09-24 08:30:13 +02:00

201 lines
4.9 KiB
Go

package main
import (
"context"
"fmt"
"log/slog"
"maps"
"strings"
"sync"
"time"
)
type ExportManager struct {
storage StorageInterface
exporters map[string]ExporterInterface
config ExportManagerConfig
mu sync.RWMutex
}
type ExportManagerConfig struct {
BatchSize int
ExportInterval time.Duration
RetryAttempts int
RetryBackoff time.Duration
HealthCheckInterval time.Duration
}
func NewExportManager(storage StorageInterface, config ExportManagerConfig) *ExportManager {
return &ExportManager{
storage: storage,
exporters: make(map[string]ExporterInterface),
config: config,
}
}
func (em *ExportManager) RegisterExporter(name string, exporter ExporterInterface) {
em.mu.Lock()
defer em.mu.Unlock()
em.exporters[name] = exporter
slog.Info("Exporter registered", "name", name)
}
func (em *ExportManager) UnregisterExporter(name string) {
em.mu.Lock()
defer em.mu.Unlock()
delete(em.exporters, name)
slog.Info("Exporter unregistered", "name", name)
}
func (em *ExportManager) Start(ctx context.Context) {
exportTicker := time.NewTicker(em.config.ExportInterval)
defer exportTicker.Stop()
healthTicker := time.NewTicker(em.config.HealthCheckInterval)
defer healthTicker.Stop()
slog.Info("Export manager started",
"batch_size", em.config.BatchSize,
"export_interval", em.config.ExportInterval,
)
for {
select {
case <-ctx.Done():
slog.Info("Export manager stopped")
return
case <-exportTicker.C:
em.exportBatch(ctx)
case <-healthTicker.C:
em.performHealthChecks(ctx)
}
}
}
func (em *ExportManager) exportBatch(ctx context.Context) {
entries, err := em.storage.(*SQLiteStorage).GetUnexportedEntries(ctx, em.config.BatchSize)
if err != nil {
slog.Error("Failed to get unexported entries", "error", err)
return
}
if len(entries) == 0 {
return
}
em.mu.RLock()
exporters := make(map[string]ExporterInterface)
maps.Copy(exporters, em.exporters)
em.mu.RUnlock()
successfulExports := make(map[string]bool)
for name, exporter := range exporters {
err := em.exportWithRetry(ctx, name, exporter, entries)
if err != nil {
slog.Error("Failed to export to target", "target", name, "error", err)
successfulExports[name] = false
} else {
slog.Debug("Successfully exported batch", "target", name, "count", len(entries))
successfulExports[name] = true
}
}
hasSuccess := false
for _, success := range successfulExports {
if success {
hasSuccess = true
break
}
}
if hasSuccess {
ids := make([]int64, len(entries))
for i, entry := range entries {
if id, ok := entry.Fields["_internal_id"].(int64); ok {
ids[i] = id
}
}
backoff := time.Duration(0)
var lastErr error
for attempt := 0; attempt <= em.config.RetryAttempts; attempt++ {
if attempt > 0 {
backoff = time.Duration(attempt) * em.config.RetryBackoff
time.Sleep(backoff)
}
err := em.storage.(*SQLiteStorage).MarkAsExported(ctx, ids)
if err == nil {
break
}
lastErr = err
if strings.Contains(err.Error(), "database is locked") {
continue
} else {
slog.Error("Failed to mark entries as exported", "error", err)
break
}
}
if lastErr != nil && backoff > 0 {
slog.Error("Failed to mark entries as exported after retries", "error", lastErr)
}
// if err := em.storage.(*SQLiteStorage).MarkAsExported(ctx, ids); err != nil {
// if strings.Contains(err.Error(), "database is locked") {
// time.Sleep(50 * time.Millisecond)
// err := em.storage.(*SQLiteStorage).MarkAsExported(ctx, ids)
// if err != nil {
// slog.Error("Failed to mark entries as exported", "error", err)
// }
// }
// slog.Error("Failed to mark entries as exported", "error", err)
// }
}
}
func (em *ExportManager) exportWithRetry(ctx context.Context, name string, exporter ExporterInterface, entries []LogEntry) error {
var lastErr error
for attempt := 0; attempt <= em.config.RetryAttempts; attempt++ {
if attempt > 0 {
backoff := time.Duration(attempt) * em.config.RetryBackoff
slog.Debug("Retrying export", "target", name, "attempt", attempt, "backoff", backoff)
select {
case <-ctx.Done():
return ctx.Err()
case <-time.After(backoff):
}
}
err := exporter.Export(ctx, entries)
if err == nil {
if attempt > 0 {
slog.Info("Export succeeded after retry", "target", name, "attempt", attempt)
}
return nil
}
lastErr = err
slog.Warn("Export attempt failed", "target", name, "attempt", attempt, "error", err)
}
return fmt.Errorf("export failed after %d attempts: %w", em.config.RetryAttempts, lastErr)
}
func (em *ExportManager) performHealthChecks(ctx context.Context) {
em.mu.RLock()
exporters := make(map[string]ExporterInterface)
maps.Copy(exporters, em.exporters)
em.mu.RUnlock()
for name, exporter := range exporters {
err := exporter.HealthCheck(ctx)
if err != nil {
slog.Warn("Health check failed", "target", name, "error", err)
} else {
slog.Debug("Health check passed", "target", name)
}
}
}