package health import ( "context" "encoding/json" "log" "sync" "time" "codeberg.org/pata1704/guenther/pkg/types" ) // HealthMonitor collects StageHealth snapshots from pipeline stages and // periodically prints a JSON report to the standard logger. // // Stages write to the channel returned by Chan(). The channel is buffered // (capacity 100) so health updates never block the sending stage. // // The channel is intentionally private (accessed via Chan()) so that callers // cannot close it from outside and cannot see the internal buffer size. type HealthMonitor struct { healthChan chan types.StageHealth mu sync.Mutex stages map[string]*types.StageHealth wg sync.WaitGroup } // NewHealthMonitor allocates a HealthMonitor. Call Start to begin processing. func NewHealthMonitor() *HealthMonitor { return &HealthMonitor{ healthChan: make(chan types.StageHealth, 100), stages: make(map[string]*types.StageHealth), } } // Chan returns the write-only channel that pipeline stages use to submit // health updates. The channel remains open for the lifetime of the monitor. func (m *HealthMonitor) Chan() chan<- types.StageHealth { return m.healthChan } // Start begins the health collection loop and periodic reporting. // interval controls how often the report is printed (typically 5 s). func (m *HealthMonitor) Start(ctx context.Context, interval time.Duration) { ticker := time.NewTicker(interval) m.wg.Go(func() { defer ticker.Stop() for { select { case h := <-m.healthChan: m.mu.Lock() // Shallow copy so the map owns the value. snap := h m.stages[h.StageName] = &snap m.mu.Unlock() case <-ticker.C: m.printReport() case <-ctx.Done(): // Drain remaining updates before exiting. for { select { case h := <-m.healthChan: m.mu.Lock() snap := h m.stages[h.StageName] = &snap m.mu.Unlock() default: return } } } } }) } // Wait waits for the health monitor goroutine to exit after context cancellation. func (m *HealthMonitor) Wait() { m.wg.Wait() } // Snapshot returns a point-in-time copy of all stage health records. // Useful for tests and metrics endpoints. func (m *HealthMonitor) Snapshot() map[string]types.StageHealth { m.mu.Lock() defer m.mu.Unlock() out := make(map[string]types.StageHealth, len(m.stages)) for k, v := range m.stages { out[k] = *v } return out } func (m *HealthMonitor) printReport() { m.mu.Lock() defer m.mu.Unlock() log.Println("── Pipeline Health ──────────────────────────────") for _, h := range m.stages { b, err := json.Marshal(h) if err != nil { log.Printf("[%s] marshal error: %v", h.StageName, err) continue } log.Printf("[%s] %s", h.StageName, b) } log.Println("─────────────────────────────────────────────────") }