191 lines
4.5 KiB
Go
191 lines
4.5 KiB
Go
package main
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"log/slog"
|
|
"maps"
|
|
"strings"
|
|
"sync"
|
|
"time"
|
|
)
|
|
|
|
type ExportManager struct {
|
|
storage StorageInterface
|
|
exporters map[string]ExporterInterface
|
|
config ExportManagerConfig
|
|
mu sync.RWMutex
|
|
}
|
|
|
|
type ExportManagerConfig struct {
|
|
BatchSize int
|
|
ExportInterval time.Duration
|
|
RetryAttempts int
|
|
RetryBackoff time.Duration
|
|
HealthCheckInterval time.Duration
|
|
}
|
|
|
|
func NewExportManager(storage StorageInterface, config ExportManagerConfig) *ExportManager {
|
|
return &ExportManager{
|
|
storage: storage,
|
|
exporters: make(map[string]ExporterInterface),
|
|
config: config,
|
|
}
|
|
}
|
|
|
|
func (em *ExportManager) RegisterExporter(name string, exporter ExporterInterface) {
|
|
em.mu.Lock()
|
|
defer em.mu.Unlock()
|
|
em.exporters[name] = exporter
|
|
slog.Info("Exporter registered", "name", name)
|
|
}
|
|
|
|
func (em *ExportManager) UnregisterExporter(name string) {
|
|
em.mu.Lock()
|
|
defer em.mu.Unlock()
|
|
delete(em.exporters, name)
|
|
slog.Info("Exporter unregistered", "name", name)
|
|
}
|
|
|
|
func (em *ExportManager) Start(ctx context.Context) {
|
|
exportTicker := time.NewTicker(em.config.ExportInterval)
|
|
defer exportTicker.Stop()
|
|
|
|
healthTicker := time.NewTicker(em.config.HealthCheckInterval)
|
|
defer healthTicker.Stop()
|
|
|
|
slog.Info("Export manager started",
|
|
"batch_size", em.config.BatchSize,
|
|
"export_interval", em.config.ExportInterval,
|
|
)
|
|
|
|
for {
|
|
select {
|
|
case <-ctx.Done():
|
|
slog.Info("Export manager stopped")
|
|
return
|
|
case <-exportTicker.C:
|
|
em.exportBatch(ctx)
|
|
case <-healthTicker.C:
|
|
em.performHealthChecks(ctx)
|
|
}
|
|
}
|
|
}
|
|
|
|
func (em *ExportManager) exportBatch(ctx context.Context) {
|
|
entries, err := em.storage.(*SQLiteStorage).GetUnexportedEntries(ctx, em.config.BatchSize)
|
|
if err != nil {
|
|
slog.Error("Failed to get unexported entries", "error", err)
|
|
return
|
|
}
|
|
|
|
if len(entries) == 0 {
|
|
return
|
|
}
|
|
|
|
em.mu.RLock()
|
|
exporters := make(map[string]ExporterInterface)
|
|
maps.Copy(exporters, em.exporters)
|
|
em.mu.RUnlock()
|
|
|
|
successfulExports := make(map[string]bool)
|
|
|
|
for name, exporter := range exporters {
|
|
err := em.exportWithRetry(ctx, name, exporter, entries)
|
|
if err != nil {
|
|
slog.Error("Failed to export to target", "target", name, "error", err)
|
|
successfulExports[name] = false
|
|
} else {
|
|
slog.Debug("Successfully exported batch", "target", name, "count", len(entries))
|
|
successfulExports[name] = true
|
|
}
|
|
}
|
|
|
|
hasSuccess := false
|
|
for _, success := range successfulExports {
|
|
if success {
|
|
hasSuccess = true
|
|
break
|
|
}
|
|
}
|
|
|
|
if hasSuccess {
|
|
ids := make([]int64, len(entries))
|
|
for i, entry := range entries {
|
|
if id, ok := entry.Fields["_internal_id"].(int64); ok {
|
|
ids[i] = id
|
|
}
|
|
}
|
|
|
|
backoff := time.Duration(0)
|
|
var lastErr error
|
|
|
|
for attempt := 0; attempt <= em.config.RetryAttempts; attempt++ {
|
|
if attempt > 0 {
|
|
backoff = time.Duration(attempt) * em.config.RetryBackoff
|
|
time.Sleep(backoff)
|
|
}
|
|
err := em.storage.(*SQLiteStorage).MarkAsExported(ctx, ids)
|
|
if err == nil {
|
|
break
|
|
}
|
|
lastErr = err
|
|
if strings.Contains(err.Error(), "database is locked") {
|
|
continue
|
|
} else {
|
|
slog.Error("Failed to mark entries as exported", "error", err)
|
|
break
|
|
}
|
|
}
|
|
|
|
if lastErr != nil && backoff > 0 {
|
|
slog.Error("Failed to mark entries as exported after retries", "error", lastErr)
|
|
}
|
|
}
|
|
}
|
|
|
|
func (em *ExportManager) exportWithRetry(ctx context.Context, name string, exporter ExporterInterface, entries []LogEntry) error {
|
|
var lastErr error
|
|
|
|
for attempt := 0; attempt <= em.config.RetryAttempts; attempt++ {
|
|
if attempt > 0 {
|
|
backoff := time.Duration(attempt) * em.config.RetryBackoff
|
|
slog.Debug("Retrying export", "target", name, "attempt", attempt, "backoff", backoff)
|
|
|
|
select {
|
|
case <-ctx.Done():
|
|
return ctx.Err()
|
|
case <-time.After(backoff):
|
|
}
|
|
}
|
|
|
|
err := exporter.Export(ctx, entries)
|
|
if err == nil {
|
|
if attempt > 0 {
|
|
slog.Info("Export succeeded after retry", "target", name, "attempt", attempt)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
lastErr = err
|
|
slog.Warn("Export attempt failed", "target", name, "attempt", attempt, "error", err)
|
|
}
|
|
|
|
return fmt.Errorf("export failed after %d attempts: %w", em.config.RetryAttempts, lastErr)
|
|
}
|
|
|
|
func (em *ExportManager) performHealthChecks(ctx context.Context) {
|
|
em.mu.RLock()
|
|
exporters := make(map[string]ExporterInterface)
|
|
maps.Copy(exporters, em.exporters)
|
|
em.mu.RUnlock()
|
|
|
|
for name, exporter := range exporters {
|
|
err := exporter.HealthCheck(ctx)
|
|
if err != nil {
|
|
slog.Warn("Health check failed", "target", name, "error", err)
|
|
} else {
|
|
slog.Debug("Health check passed", "target", name)
|
|
}
|
|
}
|
|
}
|