feat(system-metrics,service-metrics,elastic): update elastic-client to version 8, improve parser logic to add more information, improve system monitor to add more information

This commit is contained in:
Patryk Hegenberg 2025-09-15 08:25:20 +02:00
parent 6c098ed61c
commit 159df116c8
10 changed files with 587 additions and 190 deletions

View file

@ -44,15 +44,24 @@ type ElasticsearchConfig struct {
} }
type SystemMetrics struct { type SystemMetrics struct {
Enabled bool `mapstructure:"enabled"` Enabled bool `mapstructure:"enabled"`
CollectCPU bool `mapstructure:"collect_cpu"` CollectCPU bool `mapstructure:"collect_cpu"`
CollectMemory bool `mapstructure:"collect_memory"` CollectMemory bool `mapstructure:"collect_memory"`
CollectDisk bool `mapstructure:"collect_disk"` CollectDisk bool `mapstructure:"collect_disk"`
CollectNetwork bool `mapstructure:"collect_network"` CollectNetwork bool `mapstructure:"collect_network"`
CollectProcesses bool `mapstructure:"collect_processes"` CollectProcesses bool `mapstructure:"collect_processes"`
DiskPaths []string `mapstructure:"disk_paths"` DiskPaths []string `mapstructure:"disk_paths"`
NetworkInterfaces []string `mapstructure:"network_interfaces"` NetworkInterfaces []string `mapstructure:"network_interfaces"`
TopProcessesLimit int `mapstructure:"top_processes_limit"` TopProcessesLimit int `mapstructure:"top_processes_limit"`
CollectNetworkConnections bool `mapstructure:"collect_network_connections"`
CollectLoadAverage bool `mapstructure:"collect_load_average"`
CollectTCPStats bool `mapstructure:"collect_tcp_stats"`
CollectFileHandles bool `mapstructure:"collect_filehandles"`
CollectDiskIO bool `mapstructure:"collect_disk_io"`
CollectNetworkLatency bool `mapstructure:"collect_network_latency"`
CollectBandwidthUsage bool `mapstructure:"collect_bandwidth_usage"`
TransferPorts []int `mapstructure:"transfer_ports"`
LatencyTestHosts []string `mapstructure:"latency_test_hosts"`
} }
type Config struct { type Config struct {

View file

@ -8,7 +8,7 @@ import (
"strings" "strings"
"time" "time"
"github.com/elastic/go-elasticsearch/v7" "github.com/elastic/go-elasticsearch/v8"
) )
func NewElasticsearchClient(config ElasticsearchConfig) (*elasticsearch.Client, error) { func NewElasticsearchClient(config ElasticsearchConfig) (*elasticsearch.Client, error) {

View file

@ -9,7 +9,7 @@ import (
"strings" "strings"
"time" "time"
"github.com/elastic/go-elasticsearch/v7" "github.com/elastic/go-elasticsearch/v8"
) )
type ElasticsearchExporter struct { type ElasticsearchExporter struct {

10
go.mod
View file

@ -3,14 +3,18 @@ module tixel_watch
go 1.24.1 go 1.24.1
require ( require (
github.com/elastic/go-elasticsearch/v7 v7.17.10 github.com/elastic/go-elasticsearch/v8 v8.19.0
github.com/hpcloud/tail v1.0.0 github.com/hpcloud/tail v1.0.0
github.com/shirou/gopsutil v3.21.11+incompatible github.com/shirou/gopsutil v3.21.11+incompatible
github.com/spf13/viper v1.20.1 github.com/spf13/viper v1.20.1
golang.org/x/sys v0.31.0
) )
require ( require (
github.com/elastic/elastic-transport-go/v8 v8.7.0 // indirect
github.com/fsnotify/fsnotify v1.8.0 // indirect github.com/fsnotify/fsnotify v1.8.0 // indirect
github.com/go-logr/logr v1.4.2 // indirect
github.com/go-logr/stdr v1.2.2 // indirect
github.com/go-ole/go-ole v1.2.6 // indirect github.com/go-ole/go-ole v1.2.6 // indirect
github.com/go-viper/mapstructure/v2 v2.2.1 // indirect github.com/go-viper/mapstructure/v2 v2.2.1 // indirect
github.com/pelletier/go-toml/v2 v2.2.3 // indirect github.com/pelletier/go-toml/v2 v2.2.3 // indirect
@ -23,9 +27,11 @@ require (
github.com/tklauser/go-sysconf v0.3.15 // indirect github.com/tklauser/go-sysconf v0.3.15 // indirect
github.com/tklauser/numcpus v0.10.0 // indirect github.com/tklauser/numcpus v0.10.0 // indirect
github.com/yusufpapurcu/wmi v1.2.4 // indirect github.com/yusufpapurcu/wmi v1.2.4 // indirect
go.opentelemetry.io/otel v1.29.0 // indirect
go.opentelemetry.io/otel/metric v1.29.0 // indirect
go.opentelemetry.io/otel/trace v1.29.0 // indirect
go.uber.org/atomic v1.9.0 // indirect go.uber.org/atomic v1.9.0 // indirect
go.uber.org/multierr v1.9.0 // indirect go.uber.org/multierr v1.9.0 // indirect
golang.org/x/sys v0.31.0 // indirect
golang.org/x/text v0.21.0 // indirect golang.org/x/text v0.21.0 // indirect
gopkg.in/fsnotify.v1 v1.4.7 // indirect gopkg.in/fsnotify.v1 v1.4.7 // indirect
gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7 // indirect gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7 // indirect

21
go.sum
View file

@ -1,18 +1,27 @@
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/elastic/go-elasticsearch/v7 v7.17.10 h1:TCQ8i4PmIJuBunvBS6bwT2ybzVFxxUhhltAs3Gyu1yo= github.com/elastic/elastic-transport-go/v8 v8.7.0 h1:OgTneVuXP2uip4BA658Xi6Hfw+PeIOod2rY3GVMGoVE=
github.com/elastic/go-elasticsearch/v7 v7.17.10/go.mod h1:OJ4wdbtDNk5g503kvlHLyErCgQwwzmDtaFC4XyOxXA4= github.com/elastic/elastic-transport-go/v8 v8.7.0/go.mod h1:YLHer5cj0csTzNFXoNQ8qhtGY1GTvSqPnKWKaqQE3Hk=
github.com/elastic/go-elasticsearch/v8 v8.19.0 h1:VmfBLNRORY7RZL+9hTxBD97ehl9H8Nxf2QigDh6HuMU=
github.com/elastic/go-elasticsearch/v8 v8.19.0/go.mod h1:F3j9e+BubmKvzvLjNui/1++nJuJxbkhHefbaT0kFKGY=
github.com/frankban/quicktest v1.14.6 h1:7Xjx+VpznH+oBnejlPUj8oUpdxnVs4f8XU8WnHkI4W8= github.com/frankban/quicktest v1.14.6 h1:7Xjx+VpznH+oBnejlPUj8oUpdxnVs4f8XU8WnHkI4W8=
github.com/frankban/quicktest v1.14.6/go.mod h1:4ptaffx2x8+WTWXmUCuVU6aPUX1/Mz7zb5vbUoiM6w0= github.com/frankban/quicktest v1.14.6/go.mod h1:4ptaffx2x8+WTWXmUCuVU6aPUX1/Mz7zb5vbUoiM6w0=
github.com/fsnotify/fsnotify v1.8.0 h1:dAwr6QBTBZIkG8roQaJjGof0pp0EeF+tNV7YBP3F/8M= github.com/fsnotify/fsnotify v1.8.0 h1:dAwr6QBTBZIkG8roQaJjGof0pp0EeF+tNV7YBP3F/8M=
github.com/fsnotify/fsnotify v1.8.0/go.mod h1:8jBTzvmWwFyi3Pb8djgCCO5IBqzKJ/Jwo8TRcHyHii0= github.com/fsnotify/fsnotify v1.8.0/go.mod h1:8jBTzvmWwFyi3Pb8djgCCO5IBqzKJ/Jwo8TRcHyHii0=
github.com/go-logr/logr v1.2.2/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A=
github.com/go-logr/logr v1.4.2 h1:6pFjapn8bFcIbiKo3XT4j/BhANplGihG6tvd+8rYgrY=
github.com/go-logr/logr v1.4.2/go.mod h1:9T104GzyrTigFIr8wt5mBrctHMim0Nb2HLGrmQ40KvY=
github.com/go-logr/stdr v1.2.2 h1:hSWxHoqTgW2S2qGc0LTAI563KZ5YKYRhT3MFKZMbjag=
github.com/go-logr/stdr v1.2.2/go.mod h1:mMo/vtBO5dYbehREoey6XUKy/eSumjCCveDpRre4VKE=
github.com/go-ole/go-ole v1.2.6 h1:/Fpf6oFPoeFik9ty7siob0G6Ke8QvQEuVcuChpwXzpY= github.com/go-ole/go-ole v1.2.6 h1:/Fpf6oFPoeFik9ty7siob0G6Ke8QvQEuVcuChpwXzpY=
github.com/go-ole/go-ole v1.2.6/go.mod h1:pprOEPIfldk/42T2oK7lQ4v4JSDwmV0As9GaiUsvbm0= github.com/go-ole/go-ole v1.2.6/go.mod h1:pprOEPIfldk/42T2oK7lQ4v4JSDwmV0As9GaiUsvbm0=
github.com/go-viper/mapstructure/v2 v2.2.1 h1:ZAaOCxANMuZx5RCeg0mBdEZk7DZasvvZIxtHqx8aGss= github.com/go-viper/mapstructure/v2 v2.2.1 h1:ZAaOCxANMuZx5RCeg0mBdEZk7DZasvvZIxtHqx8aGss=
github.com/go-viper/mapstructure/v2 v2.2.1/go.mod h1:oJDH3BJKyqBA2TXFhDsKDGDTlndYOZ6rGS0BRZIxGhM= github.com/go-viper/mapstructure/v2 v2.2.1/go.mod h1:oJDH3BJKyqBA2TXFhDsKDGDTlndYOZ6rGS0BRZIxGhM=
github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI= github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI=
github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY=
github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0=
github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/hpcloud/tail v1.0.0 h1:nfCOvKYfkgYP8hkirhJocXT2+zOD8yUNjXaWfTlyFKI= github.com/hpcloud/tail v1.0.0 h1:nfCOvKYfkgYP8hkirhJocXT2+zOD8yUNjXaWfTlyFKI=
github.com/hpcloud/tail v1.0.0/go.mod h1:ab1qPbhIpdTxEkNHXyeSf5vhxWSCs/tWer42PpOxQnU= github.com/hpcloud/tail v1.0.0/go.mod h1:ab1qPbhIpdTxEkNHXyeSf5vhxWSCs/tWer42PpOxQnU=
github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE= github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE=
@ -51,6 +60,14 @@ github.com/tklauser/numcpus v0.10.0 h1:18njr6LDBk1zuna922MgdjQuJFjrdppsZG60sHGfj
github.com/tklauser/numcpus v0.10.0/go.mod h1:BiTKazU708GQTYF4mB+cmlpT2Is1gLk7XVuEeem8LsQ= github.com/tklauser/numcpus v0.10.0/go.mod h1:BiTKazU708GQTYF4mB+cmlpT2Is1gLk7XVuEeem8LsQ=
github.com/yusufpapurcu/wmi v1.2.4 h1:zFUKzehAFReQwLys1b/iSMl+JQGSCSjtVqQn9bBrPo0= github.com/yusufpapurcu/wmi v1.2.4 h1:zFUKzehAFReQwLys1b/iSMl+JQGSCSjtVqQn9bBrPo0=
github.com/yusufpapurcu/wmi v1.2.4/go.mod h1:SBZ9tNy3G9/m5Oi98Zks0QjeHVDvuK0qfxQmPyzfmi0= github.com/yusufpapurcu/wmi v1.2.4/go.mod h1:SBZ9tNy3G9/m5Oi98Zks0QjeHVDvuK0qfxQmPyzfmi0=
go.opentelemetry.io/otel v1.29.0 h1:PdomN/Al4q/lN6iBJEN3AwPvUiHPMlt93c8bqTG5Llw=
go.opentelemetry.io/otel v1.29.0/go.mod h1:N/WtXPs1CNCUEx+Agz5uouwCba+i+bJGFicT8SR4NP8=
go.opentelemetry.io/otel/metric v1.29.0 h1:vPf/HFWTNkPu1aYeIsc98l4ktOQaL6LeSoeV2g+8YLc=
go.opentelemetry.io/otel/metric v1.29.0/go.mod h1:auu/QWieFVWx+DmQOUMgj0F8LHWdgalxXqvp7BII/W8=
go.opentelemetry.io/otel/sdk v1.29.0 h1:vkqKjk7gwhS8VaWb0POZKmIEDimRCMsopNYnriHyryo=
go.opentelemetry.io/otel/sdk v1.29.0/go.mod h1:pM8Dx5WKnvxLCb+8lG1PRNIDxu9g9b9g59Qr7hfAAok=
go.opentelemetry.io/otel/trace v1.29.0 h1:J/8ZNK4XgR7a21DZUAsbF8pZ5Jcw1VhACmnYt39JTi4=
go.opentelemetry.io/otel/trace v1.29.0/go.mod h1:eHl3w0sp3paPkYstJOmAimxhiFXPg+MMTlEh3nsQgWQ=
go.uber.org/atomic v1.9.0 h1:ECmE8Bn/WFTYwEW/bpKD3M8VtR/zQVbavAoalC1PYyE= go.uber.org/atomic v1.9.0 h1:ECmE8Bn/WFTYwEW/bpKD3M8VtR/zQVbavAoalC1PYyE=
go.uber.org/atomic v1.9.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc= go.uber.org/atomic v1.9.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc=
go.uber.org/multierr v1.9.0 h1:7fIwc/ZtS0q++VgcfqFDxSBZVv/Xo49/SYnDFupUwlI= go.uber.org/multierr v1.9.0 h1:7fIwc/ZtS0q++VgcfqFDxSBZVv/Xo49/SYnDFupUwlI=

View file

@ -5,7 +5,7 @@ import (
"log/slog" "log/slog"
"time" "time"
"github.com/elastic/go-elasticsearch/v7" "github.com/elastic/go-elasticsearch/v8"
) )
type LogProcessor struct { type LogProcessor struct {

108
models.go
View file

@ -5,19 +5,88 @@ import (
) )
type SystemResources struct { type SystemResources struct {
Timestamp time.Time `json:"@timestamp"` Timestamp time.Time `json:"@timestamp"`
Type string `json:"type"` Type string `json:"type"`
Host string `json:"host"` Host string `json:"host"`
CPUPercent float64 `json:"cpu_percent,omitempty"` CPUPercent float64 `json:"cpu_percent,omitempty"`
MemoryUsed uint64 `json:"memory_used,omitempty"` MemoryUsed uint64 `json:"memory_used,omitempty"`
MemoryTotal uint64 `json:"memory_total,omitempty"` MemoryTotal uint64 `json:"memory_total,omitempty"`
MemoryPercent float64 `json:"memory_percent,omitempty"` MemoryPercent float64 `json:"memory_percent,omitempty"`
DiskUsage map[string]DiskUsage `json:"disk_usage,omitempty"` DiskUsage map[string]DiskUsage `json:"disk_usage,omitempty"`
NetworkStats map[string]NetworkStat `json:"network_stats,omitempty"` NetworkStats map[string]NetworkStat `json:"network_stats,omitempty"`
LoadAverage []float64 `json:"load_average,omitempty"` LoadAverage []float64 `json:"load_average,omitempty"`
Uptime uint64 `json:"uptime,omitempty"` Uptime uint64 `json:"uptime,omitempty"`
TopProcesses []ProcessInfo `json:"top_processes"` TopProcesses []ProcessInfo `json:"top_processes"`
OpenFileDescriptors int32 `json:"open_file_descriptors"` OpenFileDescriptors int32 `json:"open_file_descriptors"`
DiskIOStats map[string]DiskIOStat `json:"disk_io_stats,omitempty"`
NetworkLatency map[string]LatencyInfo `json:"network_latency,omitempty"`
BandwidthUtilization map[string]BandwidthInfo `json:"bandwidth_utilization,omitempty"`
NetworkConnections ConnectionStats `json:"network_connections"`
TCPStats TCPStatistics `json:"tcp_stats"`
SystemLimits SystemLimitInfo `json:"system_limits"`
}
type DiskIOStat struct {
ReadBytes uint64 `json:"read_bytes"`
WriteBytes uint64 `json:"write_bytes"`
ReadOps uint64 `json:"read_ops"`
WriteOps uint64 `json:"write_ops"`
ReadTime uint64 `json:"read_time"`
WriteTime uint64 `json:"write_time"`
IOUtilization float64 `json:"io_utilization"`
AvgReadLatency float64 `json:"avg_read_latency"`
AvgWriteLatency float64 `json:"avg_write_latency"`
}
type LatencyInfo struct {
Host string `json:"host"`
MinLatency time.Duration `json:"min_latency"`
MaxLatency time.Duration `json:"max_latency"`
AvgLatency time.Duration `json:"avg_latency"`
PacketLoss float64 `json:"packet_loss"`
Jitter time.Duration `json:"jitter"`
}
type BandwidthInfo struct {
Interface string `json:"interface"`
CurrentThroughputIn float64 `json:"current_throughput_in_mbps"`
CurrentThroughputOut float64 `json:"current_throughput_out_mbps"`
PeakThroughputIn float64 `json:"peak_throughput_in_mbps"`
PeakThroughputOut float64 `json:"peak_throughput_out_mbps"`
UtilizationPercent float64 `json:"utilization_percent"`
}
type ConnectionStats struct {
TotalConnections int32 `json:"total_connections"`
EstablishedTCP int32 `json:"established_tcp"`
ListeningTCP int32 `json:"listening_tcp"`
TimeWaitTCP int32 `json:"time_wait_tcp"`
TransferConnections int32 `json:"transfer_connections"`
ConnectionsByState map[string]int32 `json:"connections_by_state"`
}
type LoadAverageInfo struct {
Load1 float64 `json:"load_1min"`
Load5 float64 `json:"load_5min"`
Load15 float64 `json:"load_15min"`
}
type TCPStatistics struct {
RetransmitRate float64 `json:"retransmit_rate"`
OutOfOrderPackets uint64 `json:"out_of_order_packets"`
LostPackets uint64 `json:"lost_packets"`
CongestionWindow uint64 `json:"congestion_window"`
TCPMemoryUsage uint64 `json:"tcp_memory_usage"`
}
type SystemLimitInfo struct {
MaxOpenFiles uint64 `json:"max_open_files"`
CurrentOpenFiles uint64 `json:"current_open_files"`
MaxProcesses uint64 `json:"max_processes"`
CurrentProcesses uint64 `json:"current_processes"`
MaxTCPConnections uint64 `json:"max_tcp_connections"`
CurrentTCPConnections uint64 `json:"current_tcp_connections"`
FileDescriptorUsage float64 `json:"file_descriptor_usage_percent"`
} }
type DiskUsage struct { type DiskUsage struct {
@ -70,10 +139,13 @@ func NewLogEntry(entryType string) LogEntry {
func NewSystemResources() SystemResources { func NewSystemResources() SystemResources {
return SystemResources{ return SystemResources{
Timestamp: time.Now(), Timestamp: time.Now(),
Type: "system_metrics", Type: "system_metrics",
Host: hostname, Host: hostname,
DiskUsage: make(map[string]DiskUsage), DiskUsage: make(map[string]DiskUsage),
NetworkStats: make(map[string]NetworkStat), DiskIOStats: make(map[string]DiskIOStat),
NetworkStats: make(map[string]NetworkStat),
NetworkLatency: make(map[string]LatencyInfo),
BandwidthUtilization: make(map[string]BandwidthInfo),
} }
} }

View file

@ -226,13 +226,19 @@ var (
tjmTransferNamePattern = regexp.MustCompile(`^(\d{8}T\d{6}-[A-Za-z0-9]+-[A-Za-z]+-(?:in|out)) ?: (.*)$`) tjmTransferNamePattern = regexp.MustCompile(`^(\d{8}T\d{6}-[A-Za-z0-9]+-[A-Za-z]+-(?:in|out)) ?: (.*)$`)
tsServicePattern = regexp.MustCompile(`^(?<level>\S+)\s+(?<time>\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}\.\d{6})\s+(?<message>.*)`) tsServicePattern = regexp.MustCompile(`^(?<level>\S+)\s+(?<time>\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}\.\d{6})\s+(?<message>.*)`)
tsTransferIDPattern = regexp.MustCompile(`^(?<transfer>\w{8}-\w{4}-\w{4}-\w{4}-\w{12})\s+(?<message>.*)`) tsTransferIDPattern = regexp.MustCompile(`^(?<transfer>\w{8}-\w{4}-\w{4}-\w{4}-\w{12})\s+(?<message>.*)`)
tjmTransferIDPattern1 = regexp.MustCompile(`(?P<transfer>\w{8}-\w{4}-\w{4}-\w{4}-\w{12}).*?(?P<message>.*)`)
tjmTransferIDPattern2 = regexp.MustCompile(`(?P<before>.*)(?P<transfer>\w{8}-\w{4}-\w{4}-\w{4}-\w{12}).*?(?P<message>.*)`)
tsDetailPattern1 = regexp.MustCompile(`in: Transfer start (?P<thread>\d+/\d+) buffers=(?P<buffers>\d+) files=(?P<files>\d+) size=(?P<size>[0-9.]+) MByte chunksize=(?P<chunksize>\d+) streams=(?P<streams>\d+) target-datarate=(?P<target_datarate>[0-9.]+) MByte/s protocol=(?P<protocol>\w+) dest=(?P<dest>\S+) sender-id=(?P<sender_id>\S+)`)
tsDetailPattern2 = regexp.MustCompile(`out: Start remote transfer to (?P<target>[^\s]+) request executed, duration=(?P<duration>[0-9.]+) s`)
tsDetailPattern3 = regexp.MustCompile(`out: Transfer start (?P<thread>\d+/\d+) buffers=(?P<buffers>\d+) files=(?P<files>\d+) size=(?P<size>[0-9.]+) MByte chunksize=(?P<chunksize>\d+) streams=(?P<streams>\d+) target-datarate=(?P<target_datarate>[0-9.]+) MByte/s protocol=(?P<protocol>\w+) src=(?P<src>\S+) receiver=(?P<receiver>\S+)`)
tsDetailPattern4 = regexp.MustCompile(`out: Start transfer (?P<thread>\d+/\d+), src=(?P<src>[^ ]*) dest=(?P<dest>[^ ]*) item\[0\]=(?P<item0>[^ ]*) count=(?P<count>\d+)`)
nginxAccessPattern = regexp.MustCompile(`^(\S+)\s+\S+\s+(\S+)\s+\[([^\]]+)\]\s+"([^"]+)"\s+(\d+)\s+(\d+|-)\s*(?:"([^"]*)"\s+"([^"]*)")?`) nginxAccessPattern = regexp.MustCompile(`^(\S+)\s+\S+\s+(\S+)\s+\[([^\]]+)\]\s+"([^"]+)"\s+(\d+)\s+(\d+|-)\s*(?:"([^"]*)"\s+"([^"]*)")?`)
tjmInnerLogPattern = regexp.MustCompile(`^(?P<transfername>[^ ]+) : (?P<javaclass>\w+)\.(?P<methode>\w+): started transfer session (?P<transferid>[a-f0-9\-]+) on (?P<localaddr>[\d\.]+:\d+) with target address (?P<targetaddr>[a-zA-Z0-9\.\-]+:\d+)`)
) )
func parseTixstreamService(entry LogEntry) LogEntry { func parseTixstreamService(entry LogEntry) LogEntry {
newEntry := entry newEntry := entry
fields := make(map[string]any) fields := make(map[string]any)
matches := tsServicePattern.FindStringSubmatch(newEntry.Message) matches := tsServicePattern.FindStringSubmatch(newEntry.Message)
if len(matches) > 0 { if len(matches) > 0 {
timestamp := strings.Join(strings.Split(matches[2], " "), "T") timestamp := strings.Join(strings.Split(matches[2], " "), "T")
@ -243,8 +249,9 @@ func parseTixstreamService(entry LogEntry) LogEntry {
fields["log_message"] = newEntry.Message fields["log_message"] = newEntry.Message
} }
trNameMatch := tsTransferIDPattern.FindStringSubmatch(fields["log_message"].(string)) trNameMatch := tsTransferIDPattern.FindStringSubmatch(fields["log_message"].(string))
var transferID string
if len(trNameMatch) > 0 { if len(trNameMatch) > 0 {
fields["transfer_id"] = trNameMatch[1] transferID = trNameMatch[1]
fields["log_message"] = trNameMatch[2] fields["log_message"] = trNameMatch[2]
split := strings.Fields(trNameMatch[2]) split := strings.Fields(trNameMatch[2])
switch split[0] { switch split[0] {
@ -261,53 +268,81 @@ func parseTixstreamService(entry LogEntry) LogEntry {
if len(parts) < 5 { if len(parts) < 5 {
return newEntry return newEntry
} }
tsDetail := tsDetailPattern1.FindStringSubmatch(fields["log_message"].(string))
if len(tsDetail) > 0 {
threadInt, _ := strconv.Atoi(strings.Split(tsDetail[1], "/")[0])
fields["thread"] = threadInt
buffersInt, _ := strconv.Atoi(tsDetail[2])
fields["buffers"] = buffersInt
fileCountInt, _ := strconv.Atoi(tsDetail[3])
fields["file_count"] = fileCountInt
fileSizeInt, _ := strconv.Atoi(tsDetail[4])
fields["file_size"] = fileSizeInt
chunkSizeInt, _ := strconv.Atoi(tsDetail[5])
fields["chunksize"] = chunkSizeInt
streamsInt, _ := strconv.Atoi(tsDetail[6])
fields["streams"] = streamsInt
datarateFloat, _ := strconv.ParseFloat(tsDetail[7], 64)
fields["target_datarate"] = datarateFloat
fields["protocoll"] = tsDetail[8]
fields["destination"] = tsDetail[9]
fields["sender_id"] = tsDetail[10]
}
tsDetail = tsDetailPattern2.FindStringSubmatch(fields["log_message"].(string))
if len(tsDetail) > 0 {
fields["transfer_target"] = tsDetail[1]
}
tsDetail = tsDetailPattern3.FindStringSubmatch(fields["log_message"].(string))
if len(tsDetail) > 0 {
threadInt, _ := strconv.Atoi(strings.Split(tsDetail[1], "/")[0])
fields["thread"] = threadInt
buffersInt, _ := strconv.Atoi(tsDetail[2])
fields["buffers"] = buffersInt
fileCountInt, _ := strconv.Atoi(tsDetail[3])
fields["file_count"] = fileCountInt
fileSizeInt, _ := strconv.Atoi(tsDetail[4])
fields["file_size"] = fileSizeInt
chunkSizeInt, _ := strconv.Atoi(tsDetail[5])
fields["chunksize"] = chunkSizeInt
streamsInt, _ := strconv.Atoi(tsDetail[6])
fields["streams"] = streamsInt
datarateFloat, _ := strconv.ParseFloat(tsDetail[7], 64)
fields["target_datarate"] = datarateFloat
fields["protocoll"] = tsDetail[8]
fields["source"] = tsDetail[9]
fields["receiver_id"] = tsDetail[10]
}
tsDetail = tsDetailPattern4.FindStringSubmatch(fields["log_message"].(string))
if len(tsDetail) > 0 {
threadInt, _ := strconv.Atoi(strings.Split(tsDetail[1], "/")[0])
fields["thread"] = threadInt
fields["source"] = tsDetail[2]
fields["destination"] = tsDetail[3]
fields["item"] = tsDetail[4]
fields["count"] = tsDetail[5]
}
if strings.Contains(fields["log_message"].(string), "Transfer start") || strings.Contains(fields["log_message"].(string), "Transfer started,") {
fields["event"] = "transfer_started"
fields["start_time"] = fields["message_timestamp"]
}
if strings.Contains(fields["log_message"].(string), "Transfer stopped local state=finished") {
fields["event"] = "transfer_stopped"
fields["end_time"] = fields["message_timestamp"]
}
// value, ok := fields["transfer_id"]
if transferID != "" {
fields["transfer_identifier"] = transferID
}
if fields["transfer_identifier"] == nil {
fields["transfer_identifier"] = "unknown"
}
if fields["message_timestamp"] == nil {
fields["message_timestamp"] = newEntry.Timestamp
}
newEntry.Fields = fields newEntry.Fields = fields
return newEntry return newEntry
} }
// func parseTixstreamService(entry LogEntry) LogEntry {
// newEntry := entry
// msg := strings.ReplaceAll(entry.Message, " ", " ")
// parts := strings.Fields(msg)
// if len(parts) < 5 {
// return newEntry
// }
//
// info := parts[4:]
//
// if newEntry.Fields == nil {
// newEntry.Fields = make(map[string]any)
// }
// newEntry.Fields["log_level"] = parts[0]
// newEntry.Fields["message_date"] = parts[1]
// newEntry.Fields["message_time"] = parts[2]
// newEntry.Fields["transfer_id"] = parts[3]
// newEntry.Fields["log_message"] = strings.Join(info, " ")
//
// switch info[0] {
// case "in:":
// newEntry.Fields["log_type"] = "log_message"
// newEntry.Fields["transfer_direction"] = "incoming"
// newEntry.Fields["transfer_info"] = strings.Join(info[1:], " ")
// case "out:":
// newEntry.Fields["log_type"] = "log_message"
// newEntry.Fields["transfer_direction"] = "outgoing"
// newEntry.Fields["transfer_info"] = strings.Join(info[1:], " ")
// case "queue-stats:":
// newEntry.Fields["log_type"] = "queue-stats"
// newEntry.Fields["queue-stats"] = strings.Join(info[1:], " ")
// case "transfer:":
// newEntry.Fields["log_type"] = "transfer_info"
// newEntry.Fields["transfer_info"] = strings.Join(info[1:], " ")
// default:
// newEntry.Fields["log_type"] = "log_message"
// newEntry.Fields["transfer_info"] = strings.Join(info, " ")
// }
//
// return newEntry
// }
func parseTJMService(entry LogEntry) LogEntry { func parseTJMService(entry LogEntry) LogEntry {
newEntry := entry newEntry := entry
logContent := entry.Message logContent := entry.Message
@ -335,8 +370,10 @@ func parseTJMService(entry LogEntry) LogEntry {
fields["log_message"] = logContent fields["log_message"] = logContent
} }
trNameMatch := tjmTransferNamePattern.FindStringSubmatch(fields["log_message"].(string)) trNameMatch := tjmTransferNamePattern.FindStringSubmatch(fields["log_message"].(string))
var transferName string
var transferID string
if len(trNameMatch) > 0 { if len(trNameMatch) > 0 {
fields["transfer_name"] = trNameMatch[1] transferName = trNameMatch[1]
fields["log_message"] = trNameMatch[2] fields["log_message"] = trNameMatch[2]
if strings.Contains(trNameMatch[1], "-in") { if strings.Contains(trNameMatch[1], "-in") {
fields["transfer_direction"] = "incoming" fields["transfer_direction"] = "incoming"
@ -345,32 +382,28 @@ func parseTJMService(entry LogEntry) LogEntry {
fields["transfer_direction"] = "outgoing" fields["transfer_direction"] = "outgoing"
} }
} }
value, ok := newEntry.Fields["log_message"] trIDMatch := tjmTransferIDPattern1.FindStringSubmatch(fields["log_message"].(string))
if ok { if len(trIDMatch) > 0 {
matches := tjmInnerLogPattern.FindStringSubmatch(value.(string)) transferID = trIDMatch[1]
groups := tjmInnerLogPattern.SubexpNames() }
if len(matches) >= 1 { trIDMatch = tjmTransferIDPattern2.FindStringSubmatch(fields["log_message"].(string))
for i, name := range groups { if len(trIDMatch) > 0 {
if i != 0 && name != "" { transferID = trIDMatch[2]
newEntry.Fields[name] = matches[i] }
} // value, ok := fields["transfer_id"]
} if transferID != "" {
} else { fields["transfer_identifier"] = transferID
if strings.Contains(value.(string), "TransferJobTixstreamFile") || strings.Contains(value.(string), "PeerJobController.handlePeerJobAction") { } else if transferName != "" {
if strings.Contains(value.(string), "started transfer session") { // value, ok := fields["transfer_name"]
tmpSplit := strings.Split(value.(string), " ") // if ok {
newEntry.Fields["transfer_name"] = tmpSplit[0] fields["transfer_identifier"] = transferName
newEntry.Fields["java_class_method"] = tmpSplit[2] // }
transferID, _ := strings.CutPrefix(strings.Join(tmpSplit[3:], " "), "started transfer session") }
newEntry.Fields["transfer-id"] = strings.Split(transferID, " ")[0] if fields["transfer_identifier"] == nil {
} else if strings.Contains(value.(string), "set transfer session id") { fields["transfer_identifier"] = "unknown"
tmpSplit := strings.Split(value.(string), " ") }
newEntry.Fields["java_class_method"] = tmpSplit[0] if fields["message_timestamp"] == nil {
transferID := tmpSplit[len(tmpSplit)-1] fields["message_timestamp"] = newEntry.Timestamp
newEntry.Fields["transfer-id"] = transferID
}
}
}
} }
newEntry.Fields = fields newEntry.Fields = fields
@ -378,93 +411,30 @@ func parseTJMService(entry LogEntry) LogEntry {
return newEntry return newEntry
} }
// func parseTJMService(entry LogEntry) LogEntry {
// newEntry := entry
// msg := strings.TrimSpace(entry.Message)
// msg = strings.ReplaceAll(msg, " ", " ")
// msg = strings.ReplaceAll(msg, "---", "")
// msg = strings.ReplaceAll(msg, " ", " ")
// parts := strings.Fields(msg)
// if len(parts) < 4 {
// return newEntry
// }
//
// info := parts[3:]
//
// if newEntry.Fields == nil {
// newEntry.Fields = make(map[string]any)
// }
// newEntry.Fields["log_level"] = parts[2]
// newEntry.Fields["message_date"] = parts[0]
// newEntry.Fields["message_time"] = parts[1]
// newEntry.Fields["message"] = strings.Join(info, " ")
//
// tmpInfo := strings.ReplaceAll(strings.Join(info, " "), "[ ]", "[]")
// tmpInfo = strings.ReplaceAll(tmpInfo, "[ ", "[")
// tmpSplit := strings.Fields(tmpInfo)
// if len(tmpSplit) > 4 {
// newEntry.Fields["username"] = tmpSplit[2]
// newEntry.Fields["correlation_id"] = tmpSplit[1]
// newEntry.Fields["thread_id"] = tmpSplit[3]
// newEntry.Fields["java_class"] = tmpSplit[4]
// newEntry.Fields["log_type"] = "log_message"
// if len(tmpSplit) > 6 && strings.Contains(tmpSplit[6], "-out") {
// newEntry.Fields["transfer_direction"] = "outgoing"
// newEntry.Fields["log_message"] = strings.Join(tmpSplit[7:], " ")
// } else if len(tmpSplit) > 6 && strings.Contains(tmpSplit[6], "-in") {
// newEntry.Fields["transfer_direction"] = "incoming"
// newEntry.Fields["log_message"] = strings.Join(tmpSplit[7:], " ")
// } else {
// newEntry.Fields["log_message"] = strings.Join(tmpSplit[6:], " ")
// }
// }
// value, ok := newEntry.Fields["log_message"]
// if ok {
// matches := tjmInnerLogPattern.FindStringSubmatch(value.(string))
// groups := tjmInnerLogPattern.SubexpNames()
// if len(matches) >= 1 {
// for i, name := range groups {
// if i != 0 && name != "" {
// newEntry.Fields[name] = matches[i]
// }
// }
// } else {
// if strings.Contains(value.(string), "TransferJobTixstreamFile") || strings.Contains(value.(string), "PeerJobController.handlePeerJobAction") {
// if strings.Contains(value.(string), "started transfer session") {
// tmpSplit := strings.Split(value.(string), " ")
// newEntry.Fields["transfer_name"] = tmpSplit[0]
// newEntry.Fields["java_class_method"] = tmpSplit[2]
// transferID, _ := strings.CutPrefix(strings.Join(tmpSplit[3:], " "), "started transfer session")
// newEntry.Fields["transfer-id"] = strings.Split(transferID, " ")[0]
// } else if strings.Contains(value.(string), "set transfer session id") {
// tmpSplit := strings.Split(value.(string), " ")
// newEntry.Fields["java_class_method"] = tmpSplit[0]
// transferID := tmpSplit[len(tmpSplit)-1]
// newEntry.Fields["transfer-id"] = transferID
// }
// }
// }
// }
//
// return newEntry
// }
func parseAMService(entry LogEntry) LogEntry { func parseAMService(entry LogEntry) LogEntry {
newEntry := entry newEntry := entry
if newEntry.Fields == nil { if newEntry.Fields == nil {
newEntry.Fields = make(map[string]any) newEntry.Fields = make(map[string]any)
} }
fields := make(map[string]any)
matches := amServicePattern.FindStringSubmatch(strings.TrimSpace(entry.Message)) matches := amServicePattern.FindStringSubmatch(strings.TrimSpace(entry.Message))
if len(matches) != 7 { if len(matches) != 7 {
return newEntry return newEntry
} }
timestamp := strings.Join(strings.Split(matches[1], " "), "T")
fields["message_timestamp"] = timestamp
fields["log_level"] = matches[2]
fields["process_id"] = matches[3]
fields["thread_id"] = strings.TrimSpace(matches[4])
fields["logger_name"] = matches[5]
fields["log_message"] = matches[6]
newEntry.Fields["log_level"] = matches[2] fields["transfer_identifier"] = "unknown"
newEntry.Fields["process_id"] = matches[3] if fields["message_timestamp"] == nil {
newEntry.Fields["thread_name"] = strings.TrimSpace(matches[4]) fields["message_timestamp"] = newEntry.Timestamp
newEntry.Fields["logger_name"] = matches[5] }
newEntry.Fields["log_message"] = matches[6] newEntry.Fields = fields
return newEntry return newEntry
} }
@ -475,10 +445,24 @@ func parseTCCService(entry LogEntry) LogEntry {
newEntry.Fields = make(map[string]any) newEntry.Fields = make(map[string]any)
} }
fields := make(map[string]any)
matches := tccServicePattern.FindStringSubmatch(strings.TrimSpace(entry.Message)) matches := tccServicePattern.FindStringSubmatch(strings.TrimSpace(entry.Message))
if len(matches) != 7 { if len(matches) != 7 {
return newEntry return newEntry
} }
timestamp := strings.Join(strings.Split(matches[1], " "), "T")
fields["message_timestamp"] = timestamp
fields["log_level"] = matches[2]
fields["process_id"] = matches[3]
fields["thread_id"] = strings.TrimSpace(matches[4])
fields["logger_name"] = matches[5]
fields["log_message"] = matches[6]
fields["transfer_identifier"] = "unknown"
if fields["message_timestamp"].(string) == "" {
fields["message_timestamp"] = newEntry.Timestamp
}
newEntry.Fields = fields
newEntry.Fields["timestamp"] = matches[1] newEntry.Fields["timestamp"] = matches[1]
newEntry.Fields["log_level"] = matches[2] newEntry.Fields["log_level"] = matches[2]

View file

@ -4,28 +4,41 @@ import (
"context" "context"
"fmt" "fmt"
"log/slog" "log/slog"
"net"
"os"
"slices" "slices"
"sort" "sort"
"strconv"
"strings"
"syscall"
"time" "time"
"github.com/elastic/go-elasticsearch/v7" "github.com/elastic/go-elasticsearch/v8"
"github.com/shirou/gopsutil/cpu" "github.com/shirou/gopsutil/cpu"
"github.com/shirou/gopsutil/disk" "github.com/shirou/gopsutil/disk"
"github.com/shirou/gopsutil/host" "github.com/shirou/gopsutil/host"
"github.com/shirou/gopsutil/load"
"github.com/shirou/gopsutil/mem" "github.com/shirou/gopsutil/mem"
"github.com/shirou/gopsutil/net" psnet "github.com/shirou/gopsutil/net"
"github.com/shirou/gopsutil/process" "github.com/shirou/gopsutil/process"
"golang.org/x/sys/unix"
) )
type SystemMetricsCollector struct { type SystemMetricsCollector struct {
config SystemMetrics config SystemMetrics
pollInterval int pollInterval int
lastNetworkStats map[string]NetworkStat
lastDiskStats map[string]DiskIOStat
lastMeasureTime time.Time
} }
func NewSystemMetricsCollector(config SystemMetrics, pollInterval int) *SystemMetricsCollector { func NewSystemMetricsCollector(config SystemMetrics, pollInterval int) *SystemMetricsCollector {
return &SystemMetricsCollector{ return &SystemMetricsCollector{
config: config, config: config,
pollInterval: pollInterval, pollInterval: pollInterval,
lastNetworkStats: make(map[string]NetworkStat),
lastDiskStats: make(map[string]DiskIOStat),
lastMeasureTime: time.Now(),
} }
} }
@ -88,10 +101,306 @@ func (smc *SystemMetricsCollector) collectMetrics() (SystemResources, error) {
slog.Warn("failed to collect process metrics", "error", err) slog.Warn("failed to collect process metrics", "error", err)
} }
} }
if smc.config.CollectDiskIO {
if err = smc.collectDiskIOMetrics(&result); err != nil {
slog.Warn("failed to collect disk IO metrics", "error", err)
}
}
if smc.config.CollectNetworkConnections {
if err = smc.collectNetworkConnections(&result); err != nil {
slog.Warn("failed to collect network connections", "error", err)
}
}
if smc.config.CollectLoadAverage {
if err = smc.collectLoadAverage(&result); err != nil {
slog.Warn("failed to collect load average", "error", err)
}
}
if smc.config.CollectTCPStats {
if err = smc.collectTCPStats(&result); err != nil {
slog.Warn("failed to collect TCP stats", "error", err)
}
}
if smc.config.CollectNetworkLatency {
if err = smc.collectNetworkLatency(&result); err != nil {
slog.Warn("failed to collect network latency", "error", err)
}
}
if smc.config.CollectBandwidthUsage {
if err = smc.collectBandwidthUsage(&result); err != nil {
slog.Warn("failed to collect bandwidth usage", "error", err)
}
}
if smc.config.CollectFileHandles {
if err = smc.collectSystemLimits(&result); err != nil {
slog.Warn("failed to collect system limits", "error", err)
}
}
return result, nil return result, nil
} }
func (smc *SystemMetricsCollector) collectDiskIOMetrics(result *SystemResources) error {
diskIOStats, err := disk.IOCounters()
if err != nil {
return err
}
currentTime := time.Now()
timeDiff := currentTime.Sub(smc.lastMeasureTime).Seconds()
result.DiskIOStats = make(map[string]DiskIOStat)
for device, stats := range diskIOStats {
ioStat := DiskIOStat{
ReadBytes: stats.ReadBytes,
WriteBytes: stats.WriteBytes,
ReadOps: stats.ReadCount,
WriteOps: stats.WriteCount,
ReadTime: stats.ReadTime,
WriteTime: stats.WriteTime,
}
if stats.ReadCount > 0 {
ioStat.AvgReadLatency = float64(stats.ReadTime) / float64(stats.ReadCount)
}
if stats.WriteCount > 0 {
ioStat.AvgWriteLatency = float64(stats.WriteTime) / float64(stats.WriteCount)
}
if timeDiff > 0 {
totalTime := float64(stats.ReadTime + stats.WriteTime)
ioStat.IOUtilization = (totalTime / (timeDiff * 1000)) * 100
if ioStat.IOUtilization > 100 {
ioStat.IOUtilization = 100
}
}
result.DiskIOStats[device] = ioStat
}
return nil
}
func (smc *SystemMetricsCollector) collectNetworkConnections(result *SystemResources) error {
connections, err := psnet.Connections("all")
if err != nil {
return err
}
stats := ConnectionStats{
ConnectionsByState: make(map[string]int32),
}
for _, conn := range connections {
stats.TotalConnections++
stats.ConnectionsByState[conn.Status]++
switch conn.Status {
case "ESTABLISHED":
stats.EstablishedTCP++
case "LISTEN":
stats.ListeningTCP++
case "TIME_WAIT":
stats.TimeWaitTCP++
}
if slices.Contains(smc.config.TransferPorts, int(conn.Laddr.Port)) ||
slices.Contains(smc.config.TransferPorts, int(conn.Raddr.Port)) {
stats.TransferConnections++
}
}
result.NetworkConnections = stats
return nil
}
func (smc *SystemMetricsCollector) collectLoadAverage(result *SystemResources) error {
loadAvg, err := load.Avg()
if err != nil {
return err
}
result.LoadAverage = append(result.LoadAverage, loadAvg.Load1)
result.LoadAverage = append(result.LoadAverage, loadAvg.Load5)
result.LoadAverage = append(result.LoadAverage, loadAvg.Load15)
return nil
}
func (smc *SystemMetricsCollector) collectTCPStats(result *SystemResources) error {
tcpStats := TCPStatistics{}
if data, err := os.ReadFile("/proc/net/netstat"); err == nil {
content := string(data)
lines := strings.SplitSeq(content, "\n")
for line := range lines {
if strings.HasPrefix(line, "TcpExt:") {
}
}
}
result.TCPStats = tcpStats
return nil
}
func (smc *SystemMetricsCollector) collectNetworkLatency(result *SystemResources) error {
result.NetworkLatency = make(map[string]LatencyInfo)
for _, host := range smc.config.LatencyTestHosts {
latency := smc.measureLatency(host)
result.NetworkLatency[host] = latency
}
return nil
}
func (smc *SystemMetricsCollector) measureLatency(host string) LatencyInfo {
var latencies []time.Duration
var successful int
for range 5 {
start := time.Now()
conn, err := net.DialTimeout("tcp", host+":80", 3*time.Second)
if err == nil {
latency := time.Since(start)
latencies = append(latencies, latency)
conn.Close()
successful++
}
time.Sleep(100 * time.Millisecond)
}
if len(latencies) == 0 {
return LatencyInfo{Host: host, PacketLoss: 100.0}
}
var total time.Duration
min := latencies[0]
max := latencies[0]
for _, lat := range latencies {
total += lat
if lat < min {
min = lat
}
if lat > max {
max = lat
}
}
avg := total / time.Duration(len(latencies))
packetLoss := float64(5-successful) / 5.0 * 100.0
jitter := max - min
return LatencyInfo{
Host: host,
MinLatency: min,
MaxLatency: max,
AvgLatency: avg,
PacketLoss: packetLoss,
Jitter: jitter,
}
}
func (smc *SystemMetricsCollector) collectBandwidthUsage(result *SystemResources) error {
netStats, err := psnet.IOCounters(true)
if err != nil {
return err
}
result.BandwidthUtilization = make(map[string]BandwidthInfo)
currentTime := time.Now()
timeDiff := currentTime.Sub(smc.lastMeasureTime).Seconds()
for _, stat := range netStats {
if len(smc.config.NetworkInterfaces) > 0 &&
!slices.Contains(smc.config.NetworkInterfaces, stat.Name) {
continue
}
bandwidth := BandwidthInfo{Interface: stat.Name}
if lastStat, exists := smc.lastNetworkStats[stat.Name]; exists && timeDiff > 0 {
bytesDiffIn := float64(stat.BytesRecv - lastStat.BytesRecv)
bytesDiffOut := float64(stat.BytesSent - lastStat.BytesSent)
bandwidth.CurrentThroughputIn = (bytesDiffIn / timeDiff) / (1024 * 1024) // MB/s
bandwidth.CurrentThroughputOut = (bytesDiffOut / timeDiff) / (1024 * 1024)
bandwidth.PeakThroughputIn = bandwidth.CurrentThroughputIn
bandwidth.PeakThroughputOut = bandwidth.CurrentThroughputOut
linkCapacityMbps := 1000.0
totalThroughput := bandwidth.CurrentThroughputIn + bandwidth.CurrentThroughputOut
bandwidth.UtilizationPercent = (totalThroughput / linkCapacityMbps) * 100
}
result.BandwidthUtilization[stat.Name] = bandwidth
}
for _, stat := range netStats {
smc.lastNetworkStats[stat.Name] = NetworkStat{
BytesSent: stat.BytesSent,
BytesRecv: stat.BytesRecv,
PacketsSent: stat.PacketsSent,
PacketsRecv: stat.PacketsRecv,
}
}
smc.lastMeasureTime = currentTime
return nil
}
func (smc *SystemMetricsCollector) collectSystemLimits(result *SystemResources) error {
limits := SystemLimitInfo{}
if data, err := os.ReadFile("/proc/sys/fs/file-max"); err == nil {
if maxFiles, err := strconv.ParseUint(strings.TrimSpace(string(data)), 10, 64); err == nil {
limits.MaxOpenFiles = maxFiles
}
}
if data, err := os.ReadFile("/proc/sys/fs/file-nr"); err == nil {
fields := strings.Fields(string(data))
if len(fields) >= 1 {
if currentFiles, err := strconv.ParseUint(fields[0], 10, 64); err == nil {
limits.CurrentOpenFiles = currentFiles
if limits.MaxOpenFiles > 0 {
limits.FileDescriptorUsage = float64(currentFiles) / float64(limits.MaxOpenFiles) * 100
}
}
}
}
var rlimit syscall.Rlimit
if err := syscall.Getrlimit(unix.RLIMIT_NPROC, &rlimit); err == nil {
limits.MaxProcesses = rlimit.Max
}
result.SystemLimits = limits
return nil
}
// // Hilfsfunktionen
// func NewSystemResources() SystemResources {
// return SystemResources{
// Timestamp: time.Now(),
// DiskUsage: make(map[string]DiskUsage),
// DiskIOStats: make(map[string]DiskIOStat),
// NetworkStats: make(map[string]NetworkStat),
// NetworkLatency: make(map[string]LatencyInfo),
// BandwidthUtilization: make(map[string]BandwidthInfo),
// }
// }
func (smc *SystemMetricsCollector) collectProcessMetrics(result *SystemResources) error { func (smc *SystemMetricsCollector) collectProcessMetrics(result *SystemResources) error {
processes, err := process.Processes() processes, err := process.Processes()
if err != nil { if err != nil {
@ -206,7 +515,7 @@ func (smc *SystemMetricsCollector) collectDiskMetrics(result *SystemResources) e
} }
func (smc *SystemMetricsCollector) collectNetworkMetrics(result *SystemResources) error { func (smc *SystemMetricsCollector) collectNetworkMetrics(result *SystemResources) error {
netStats, err := net.IOCounters(true) netStats, err := psnet.IOCounters(true)
if err != nil { if err != nil {
return err return err
} }

View file

@ -11,7 +11,7 @@ import (
"strings" "strings"
"time" "time"
"github.com/elastic/go-elasticsearch/v7" "github.com/elastic/go-elasticsearch/v8"
) )
type WebService struct { type WebService struct {