111 lines
2.9 KiB
Go
111 lines
2.9 KiB
Go
package health
|
|
|
|
import (
|
|
"context"
|
|
"encoding/json"
|
|
"log"
|
|
"sync"
|
|
"time"
|
|
|
|
"codeberg.org/pata1704/guenther/pkg/types"
|
|
)
|
|
|
|
// HealthMonitor collects StageHealth snapshots from pipeline stages and
|
|
// periodically prints a JSON report to the standard logger.
|
|
//
|
|
// Stages write to the channel returned by Chan(). The channel is buffered
|
|
// (capacity 100) so health updates never block the sending stage.
|
|
//
|
|
// The channel is intentionally private (accessed via Chan()) so that callers
|
|
// cannot close it from outside and cannot see the internal buffer size.
|
|
type HealthMonitor struct {
|
|
healthChan chan types.StageHealth
|
|
|
|
mu sync.Mutex
|
|
stages map[string]*types.StageHealth
|
|
|
|
wg sync.WaitGroup
|
|
}
|
|
|
|
// NewHealthMonitor allocates a HealthMonitor. Call Start to begin processing.
|
|
func NewHealthMonitor() *HealthMonitor {
|
|
return &HealthMonitor{
|
|
healthChan: make(chan types.StageHealth, 100),
|
|
stages: make(map[string]*types.StageHealth),
|
|
}
|
|
}
|
|
|
|
// Chan returns the write-only channel that pipeline stages use to submit
|
|
// health updates. The channel remains open for the lifetime of the monitor.
|
|
func (m *HealthMonitor) Chan() chan<- types.StageHealth {
|
|
return m.healthChan
|
|
}
|
|
|
|
// Start begins the health collection loop and periodic reporting.
|
|
// interval controls how often the report is printed (typically 5 s).
|
|
func (m *HealthMonitor) Start(ctx context.Context, interval time.Duration) {
|
|
ticker := time.NewTicker(interval)
|
|
m.wg.Go(func() {
|
|
defer ticker.Stop()
|
|
for {
|
|
select {
|
|
case h := <-m.healthChan:
|
|
m.mu.Lock()
|
|
// Shallow copy so the map owns the value.
|
|
snap := h
|
|
m.stages[h.StageName] = &snap
|
|
m.mu.Unlock()
|
|
|
|
case <-ticker.C:
|
|
m.printReport()
|
|
|
|
case <-ctx.Done():
|
|
// Drain remaining updates before exiting.
|
|
for {
|
|
select {
|
|
case h := <-m.healthChan:
|
|
m.mu.Lock()
|
|
snap := h
|
|
m.stages[h.StageName] = &snap
|
|
m.mu.Unlock()
|
|
default:
|
|
return
|
|
}
|
|
}
|
|
}
|
|
}
|
|
})
|
|
}
|
|
|
|
// Wait waits for the health monitor goroutine to exit after context cancellation.
|
|
func (m *HealthMonitor) Wait() {
|
|
m.wg.Wait()
|
|
}
|
|
|
|
// Snapshot returns a point-in-time copy of all stage health records.
|
|
// Useful for tests and metrics endpoints.
|
|
func (m *HealthMonitor) Snapshot() map[string]types.StageHealth {
|
|
m.mu.Lock()
|
|
defer m.mu.Unlock()
|
|
out := make(map[string]types.StageHealth, len(m.stages))
|
|
for k, v := range m.stages {
|
|
out[k] = *v
|
|
}
|
|
return out
|
|
}
|
|
|
|
func (m *HealthMonitor) printReport() {
|
|
m.mu.Lock()
|
|
defer m.mu.Unlock()
|
|
|
|
log.Println("── Pipeline Health ──────────────────────────────")
|
|
for _, h := range m.stages {
|
|
b, err := json.Marshal(h)
|
|
if err != nil {
|
|
log.Printf("[%s] marshal error: %v", h.StageName, err)
|
|
continue
|
|
}
|
|
log.Printf("[%s] %s", h.StageName, b)
|
|
}
|
|
log.Println("─────────────────────────────────────────────────")
|
|
}
|