226 lines
4.9 KiB
Go
226 lines
4.9 KiB
Go
package main
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"log/slog"
|
|
"slices"
|
|
"sort"
|
|
"time"
|
|
|
|
"github.com/elastic/go-elasticsearch/v7"
|
|
"github.com/shirou/gopsutil/cpu"
|
|
"github.com/shirou/gopsutil/disk"
|
|
"github.com/shirou/gopsutil/host"
|
|
"github.com/shirou/gopsutil/mem"
|
|
"github.com/shirou/gopsutil/net"
|
|
"github.com/shirou/gopsutil/process"
|
|
)
|
|
|
|
type SystemMetricsCollector struct {
|
|
config SystemMetrics
|
|
pollInterval int
|
|
}
|
|
|
|
func NewSystemMetricsCollector(config SystemMetrics, pollInterval int) *SystemMetricsCollector {
|
|
return &SystemMetricsCollector{
|
|
config: config,
|
|
pollInterval: pollInterval,
|
|
}
|
|
}
|
|
|
|
func (smc *SystemMetricsCollector) Start(ctx context.Context, es *elasticsearch.Client, baseIndex string) {
|
|
ticker := time.NewTicker(time.Duration(smc.pollInterval) * time.Second)
|
|
defer ticker.Stop()
|
|
|
|
sender := NewElasticsearchSender(es)
|
|
|
|
for {
|
|
select {
|
|
case <-ctx.Done():
|
|
slog.Info("System metrics collector stopped")
|
|
return
|
|
case <-ticker.C:
|
|
metrics, err := smc.collectMetrics()
|
|
if err != nil {
|
|
slog.Error("error collecting system metrics", "error", err)
|
|
continue
|
|
}
|
|
|
|
if err := sender.SendSystemMetrics(baseIndex, metrics); err != nil {
|
|
slog.Error("error sending system metrics", "error", err)
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
func (smc *SystemMetricsCollector) collectMetrics() (SystemResources, error) {
|
|
result := NewSystemResources()
|
|
|
|
var err error
|
|
|
|
if smc.config.CollectCPU {
|
|
if err = smc.collectCPUMetrics(&result); err != nil {
|
|
return result, fmt.Errorf("CPU metrics: %w", err)
|
|
}
|
|
}
|
|
|
|
if smc.config.CollectMemory {
|
|
if err = smc.collectMemoryMetrics(&result); err != nil {
|
|
return result, fmt.Errorf("memory metrics: %w", err)
|
|
}
|
|
}
|
|
|
|
if smc.config.CollectDisk {
|
|
if err = smc.collectDiskMetrics(&result); err != nil {
|
|
return result, fmt.Errorf("disk metrics: %w", err)
|
|
}
|
|
}
|
|
|
|
if smc.config.CollectNetwork {
|
|
if err = smc.collectNetworkMetrics(&result); err != nil {
|
|
return result, fmt.Errorf("network metrics: %w", err)
|
|
}
|
|
}
|
|
|
|
if smc.config.CollectProcesses {
|
|
if err := smc.collectProcessMetrics(&result); err != nil {
|
|
slog.Warn("failed to collect process metrics", "error", err)
|
|
}
|
|
}
|
|
|
|
return result, nil
|
|
}
|
|
|
|
func (smc *SystemMetricsCollector) collectProcessMetrics(result *SystemResources) error {
|
|
processes, err := process.Processes()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
var processInfos []ProcessInfo
|
|
var totalOpenFiles int32
|
|
|
|
for _, p := range processes {
|
|
name, err := p.Name()
|
|
if err != nil {
|
|
continue
|
|
}
|
|
|
|
cpuPercent, err := p.CPUPercent()
|
|
if err != nil {
|
|
continue
|
|
}
|
|
|
|
memInfo, err := p.MemoryInfo()
|
|
if err != nil {
|
|
continue
|
|
}
|
|
|
|
status, err := p.Status()
|
|
if err != nil {
|
|
status = ""
|
|
}
|
|
|
|
createTime, err := p.CreateTime()
|
|
if err != nil {
|
|
createTime = 0
|
|
}
|
|
|
|
if openFiles, err := p.NumFDs(); err == nil {
|
|
totalOpenFiles += openFiles
|
|
}
|
|
|
|
processInfos = append(processInfos, ProcessInfo{
|
|
PID: p.Pid,
|
|
Name: name,
|
|
CPUPercent: cpuPercent,
|
|
MemoryMB: float32(memInfo.RSS) / 1024 / 1024,
|
|
Status: status,
|
|
CreateTime: createTime,
|
|
})
|
|
}
|
|
|
|
sort.Slice(processInfos, func(i, j int) bool {
|
|
return processInfos[i].CPUPercent > processInfos[j].CPUPercent
|
|
})
|
|
|
|
limit := smc.config.TopProcessesLimit
|
|
if len(processInfos) > limit {
|
|
processInfos = processInfos[:limit]
|
|
}
|
|
|
|
result.TopProcesses = processInfos
|
|
result.OpenFileDescriptors = totalOpenFiles
|
|
|
|
return nil
|
|
}
|
|
|
|
func (smc *SystemMetricsCollector) collectCPUMetrics(result *SystemResources) error {
|
|
cpuPercents, err := cpu.Percent(time.Second, false)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
if len(cpuPercents) > 0 {
|
|
result.CPUPercent = cpuPercents[0]
|
|
}
|
|
|
|
if hostStat, err := host.Info(); err == nil {
|
|
result.Uptime = hostStat.Uptime
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (smc *SystemMetricsCollector) collectMemoryMetrics(result *SystemResources) error {
|
|
vmStat, err := mem.VirtualMemory()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
result.MemoryUsed = vmStat.Used
|
|
result.MemoryTotal = vmStat.Total
|
|
result.MemoryPercent = vmStat.UsedPercent
|
|
|
|
return nil
|
|
}
|
|
|
|
func (smc *SystemMetricsCollector) collectDiskMetrics(result *SystemResources) error {
|
|
for _, path := range smc.config.DiskPaths {
|
|
diskStat, err := disk.Usage(path)
|
|
if err != nil {
|
|
slog.Error("error reading disk stats", "path", path, "error", err)
|
|
continue
|
|
}
|
|
|
|
result.DiskUsage[path] = DiskUsage{
|
|
Used: diskStat.Used,
|
|
Total: diskStat.Total,
|
|
UsedPercent: diskStat.UsedPercent,
|
|
Free: diskStat.Free,
|
|
}
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (smc *SystemMetricsCollector) collectNetworkMetrics(result *SystemResources) error {
|
|
netStats, err := net.IOCounters(true)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
for _, stat := range netStats {
|
|
if len(smc.config.NetworkInterfaces) == 0 || slices.Contains(smc.config.NetworkInterfaces, stat.Name) {
|
|
result.NetworkStats[stat.Name] = NetworkStat{
|
|
BytesSent: stat.BytesSent,
|
|
BytesRecv: stat.BytesRecv,
|
|
PacketsSent: stat.PacketsSent,
|
|
PacketsRecv: stat.PacketsRecv,
|
|
}
|
|
}
|
|
}
|
|
|
|
return nil
|
|
}
|