guenther/internal/health/monitor.go

111 lines
2.9 KiB
Go

package health
import (
"context"
"encoding/json"
"log"
"sync"
"time"
"codeberg.org/pata1704/guenther/pkg/types"
)
// HealthMonitor collects StageHealth snapshots from pipeline stages and
// periodically prints a JSON report to the standard logger.
//
// Stages write to the channel returned by Chan(). The channel is buffered
// (capacity 100) so health updates never block the sending stage.
//
// The channel is intentionally private (accessed via Chan()) so that callers
// cannot close it from outside and cannot see the internal buffer size.
type HealthMonitor struct {
healthChan chan types.StageHealth
mu sync.Mutex
stages map[string]*types.StageHealth
wg sync.WaitGroup
}
// NewHealthMonitor allocates a HealthMonitor. Call Start to begin processing.
func NewHealthMonitor() *HealthMonitor {
return &HealthMonitor{
healthChan: make(chan types.StageHealth, 100),
stages: make(map[string]*types.StageHealth),
}
}
// Chan returns the write-only channel that pipeline stages use to submit
// health updates. The channel remains open for the lifetime of the monitor.
func (m *HealthMonitor) Chan() chan<- types.StageHealth {
return m.healthChan
}
// Start begins the health collection loop and periodic reporting.
// interval controls how often the report is printed (typically 5 s).
func (m *HealthMonitor) Start(ctx context.Context, interval time.Duration) {
ticker := time.NewTicker(interval)
m.wg.Go(func() {
defer ticker.Stop()
for {
select {
case h := <-m.healthChan:
m.mu.Lock()
// Shallow copy so the map owns the value.
snap := h
m.stages[h.StageName] = &snap
m.mu.Unlock()
case <-ticker.C:
m.printReport()
case <-ctx.Done():
// Drain remaining updates before exiting.
for {
select {
case h := <-m.healthChan:
m.mu.Lock()
snap := h
m.stages[h.StageName] = &snap
m.mu.Unlock()
default:
return
}
}
}
}
})
}
// Wait waits for the health monitor goroutine to exit after context cancellation.
func (m *HealthMonitor) Wait() {
m.wg.Wait()
}
// Snapshot returns a point-in-time copy of all stage health records.
// Useful for tests and metrics endpoints.
func (m *HealthMonitor) Snapshot() map[string]types.StageHealth {
m.mu.Lock()
defer m.mu.Unlock()
out := make(map[string]types.StageHealth, len(m.stages))
for k, v := range m.stages {
out[k] = *v
}
return out
}
func (m *HealthMonitor) printReport() {
m.mu.Lock()
defer m.mu.Unlock()
log.Println("── Pipeline Health ──────────────────────────────")
for _, h := range m.stages {
b, err := json.Marshal(h)
if err != nil {
log.Printf("[%s] marshal error: %v", h.StageName, err)
continue
}
log.Printf("[%s] %s", h.StageName, b)
}
log.Println("─────────────────────────────────────────────────")
}