package main import ( "context" "fmt" "log/slog" "maps" "strings" "sync" "time" "tixel_watch/models" ) type ExportManager struct { storage StorageInterface exporters map[string]ExporterInterface config ExportManagerConfig mu sync.RWMutex } type ExportManagerConfig struct { BatchSize int ExportInterval time.Duration RetryAttempts int RetryBackoff time.Duration HealthCheckInterval time.Duration } func NewExportManager(storage StorageInterface, config ExportManagerConfig) *ExportManager { return &ExportManager{ storage: storage, exporters: make(map[string]ExporterInterface), config: config, } } func (em *ExportManager) RegisterExporter(name string, exporter ExporterInterface) { em.mu.Lock() defer em.mu.Unlock() em.exporters[name] = exporter slog.Info("Exporter registered", "name", name) } func (em *ExportManager) UnregisterExporter(name string) { em.mu.Lock() defer em.mu.Unlock() delete(em.exporters, name) slog.Info("Exporter unregistered", "name", name) } func (em *ExportManager) Start(ctx context.Context) { exportTicker := time.NewTicker(em.config.ExportInterval) defer exportTicker.Stop() healthTicker := time.NewTicker(em.config.HealthCheckInterval) defer healthTicker.Stop() slog.Info("Export manager started", "batch_size", em.config.BatchSize, "export_interval", em.config.ExportInterval, ) for { select { case <-ctx.Done(): slog.Info("Export manager stopped") return case <-exportTicker.C: em.exportBatch(ctx) case <-healthTicker.C: em.performHealthChecks(ctx) } } } func (em *ExportManager) exportBatch(ctx context.Context) { entries, err := em.storage.(*SQLiteStorage).GetUnexportedEntries(ctx, em.config.BatchSize) if err != nil { slog.Error("Failed to get unexported entries", "error", err) return } if len(entries) == 0 { return } em.mu.RLock() exporters := make(map[string]ExporterInterface) maps.Copy(exporters, em.exporters) em.mu.RUnlock() successfulExports := make(map[string]bool) for name, exporter := range exporters { err := em.exportWithRetry(ctx, name, exporter, entries) if err != nil { slog.Error("Failed to export to target", "target", name, "error", err) successfulExports[name] = false } else { slog.Debug("Successfully exported batch", "target", name, "count", len(entries)) successfulExports[name] = true } } hasSuccess := false for _, success := range successfulExports { if success { hasSuccess = true break } } if hasSuccess { ids := make([]int64, len(entries)) for i, entry := range entries { if id, ok := entry.Fields["_internal_id"].(int64); ok { ids[i] = id } } backoff := time.Duration(0) var lastErr error for attempt := 0; attempt <= em.config.RetryAttempts; attempt++ { if attempt > 0 { backoff = time.Duration(attempt) * em.config.RetryBackoff time.Sleep(backoff) } err := em.storage.(*SQLiteStorage).MarkAsExported(ctx, ids) if err == nil { break } lastErr = err if strings.Contains(err.Error(), "database is locked") { continue } else { slog.Error("Failed to mark entries as exported", "error", err) break } } if lastErr != nil && backoff > 0 { slog.Error("Failed to mark entries as exported after retries", "error", lastErr) } } } func (em *ExportManager) exportWithRetry(ctx context.Context, name string, exporter ExporterInterface, entries []models.LogMessage) error { var lastErr error for attempt := 0; attempt <= em.config.RetryAttempts; attempt++ { if attempt > 0 { backoff := time.Duration(attempt) * em.config.RetryBackoff slog.Debug("Retrying export", "target", name, "attempt", attempt, "backoff", backoff) select { case <-ctx.Done(): return ctx.Err() case <-time.After(backoff): } } err := exporter.Export(ctx, entries) if err == nil { if attempt > 0 { slog.Info("Export succeeded after retry", "target", name, "attempt", attempt) } return nil } lastErr = err slog.Warn("Export attempt failed", "target", name, "attempt", attempt, "error", err) } return fmt.Errorf("export failed after %d attempts: %w", em.config.RetryAttempts, lastErr) } func (em *ExportManager) performHealthChecks(ctx context.Context) { em.mu.RLock() exporters := make(map[string]ExporterInterface) maps.Copy(exporters, em.exporters) em.mu.RUnlock() for name, exporter := range exporters { err := exporter.HealthCheck(ctx) if err != nil { slog.Warn("Health check failed", "target", name, "error", err) } else { slog.Debug("Health check passed", "target", name) } } }