tixel-elastic: initial commit with first prototype version for discussion

This commit is contained in:
Patryk Hegenberg 2025-09-01 11:14:53 +02:00
commit 3af5853421
13 changed files with 1900 additions and 0 deletions

129
config.go Normal file
View file

@ -0,0 +1,129 @@
package main
import (
"fmt"
"log"
"github.com/spf13/viper"
)
type WebConfig struct {
Enabled bool `mapstructure:"enabled"`
Port int `mapstructure:"port"`
Host string `mapstructure:"host"`
}
type LogFormat struct {
Name string `mapstructure:"name"`
Pattern string `mapstructure:"pattern"`
Fields map[string]string `mapstructure:"fields"`
}
type ToolConfig struct {
Name string `mapstructure:"name"`
LogFile string `mapstructure:"log_file"`
Format LogFormat `mapstructure:"format"`
Enabled bool `mapstructure:"enabled"`
BufferSize int `mapstructure:"buffer_size"`
}
type ServiceConfig struct {
Name string `mapstructure:"name"`
Service string `mapstructure:"service"`
Enabled bool `mapstructure:"enabled"`
SinceTime string `mapstructure:"since_time"`
Priority string `mapstructure:"priority"`
}
type ElasticsearchConfig struct {
URL string `mapstructure:"url"`
Index string `mapstructure:"index"`
Username string `mapstructure:"username"`
Password string `mapstructure:"password"`
Timeout int `mapstructure:"timeout"`
}
type SystemMetrics struct {
Enabled bool `mapstructure:"enabled"`
CollectCPU bool `mapstructure:"collect_cpu"`
CollectMemory bool `mapstructure:"collect_memory"`
CollectDisk bool `mapstructure:"collect_disk"`
CollectNetwork bool `mapstructure:"collect_network"`
DiskPaths []string `mapstructure:"disk_paths"`
NetworkInterfaces []string `mapstructure:"network_interfaces"`
}
type Config struct {
Elasticsearch ElasticsearchConfig `mapstructure:"elasticsearch"`
Tools []ToolConfig `mapstructure:"tools"`
Services []ServiceConfig `mapstructure:"services"`
PollIntervalSeconds int `mapstructure:"poll_interval_seconds"`
SystemMetrics SystemMetrics `mapstructure:"system_metrics"`
WebService WebConfig `mapstructure:"web_service"`
Logging struct {
Level string `mapstructure:"level"`
FilePath string `mapstructure:"file_path"`
} `mapstructure:"logging"`
}
func LoadConfig() (*Config, error) {
viper.SetConfigName("config")
viper.AddConfigPath(".")
viper.AddConfigPath("/etc/tixel-watch/")
viper.SetConfigType("yaml")
setConfigDefaults()
if err := viper.ReadInConfig(); err != nil {
return nil, fmt.Errorf("error reading config: %w", err)
}
var cfg Config
if err := viper.Unmarshal(&cfg); err != nil {
return nil, fmt.Errorf("error parsing config: %w", err)
}
if err := validateConfig(&cfg); err != nil {
return nil, fmt.Errorf("config validation failed: %w", err)
}
return &cfg, nil
}
func setConfigDefaults() {
viper.SetDefault("poll_interval_seconds", 30)
viper.SetDefault("elasticsearch.timeout", 30)
viper.SetDefault("system_metrics.enabled", true)
viper.SetDefault("system_metrics.collect_cpu", true)
viper.SetDefault("system_metrics.collect_memory", true)
viper.SetDefault("system_metrics.collect_disk", true)
viper.SetDefault("system_metrics.collect_network", false)
viper.SetDefault("system_metrics.disk_paths", []string{"/"})
viper.SetDefault("web_service.enabled", false)
viper.SetDefault("web_service.port", 8080)
viper.SetDefault("web_service.host", "localhost")
viper.SetDefault("logging.level", "info")
}
func validateConfig(cfg *Config) error {
if cfg.Elasticsearch.URL == "" {
return fmt.Errorf("elasticsearch.url ist erforderlich")
}
if cfg.Elasticsearch.Index == "" {
return fmt.Errorf("elasticsearch.index ist erforderlich")
}
if cfg.PollIntervalSeconds <= 0 {
log.Printf("Warnung: poll_interval_seconds ist %d, setze auf 30", cfg.PollIntervalSeconds)
cfg.PollIntervalSeconds = 30
}
for i := range cfg.Tools {
if cfg.Tools[i].BufferSize <= 0 {
cfg.Tools[i].BufferSize = 100
}
}
return nil
}

90
config.yaml Normal file
View file

@ -0,0 +1,90 @@
elasticsearch:
url: "http://localhost:9200"
index: "tixel"
username: "elastic"
password: "79QQ4JGTa3R_nkqA=MxW"
timeout: 30
web_service:
enabled: true
host: "localhost"
port: 8080
system_metrics:
enabled: true
collect_cpu: true
collect_memory: true
collect_disk: true
collect_network: false
disk_paths:
- "/"
- "/var"
- "/home"
network_interfaces:
- "eth0"
- "wlan0"
poll_interval_seconds: 30
logging:
level: "info"
file_path: "/var/log/system-monitor.log"
services:
- name: "nginx"
service: "nginx.service"
enabled: true
since_time: ""
priority: "info"
- name: "tixstream"
service: "tixstream.service"
enabled: true
since_time: ""
priority: "debug"
- name: "transfer-job-manager"
service: "transfer-job-manager.service"
enabled: true
since_time: ""
priority: "debug"
tools:
- name: "nginx-access"
log_file: "/var/log/nginx/access.log"
enabled: true
buffer_size: 200
format:
name: "nginx_combined"
pattern: '^(?P<client_ip>\S+) \S+ \S+ \[(?P<timestamp>[^\]]+)\] "(?P<method>\S+) (?P<path>\S+) (?P<protocol>\S+)" (?P<status>\d+) (?P<body_bytes>\d+) "(?P<referer>[^"]*)" "(?P<user_agent>[^"]*)"'
fields:
client_ip: "remote_addr"
timestamp: "time_local"
method: "request_method"
path: "request_uri"
protocol: "server_protocol"
status: "status"
body_bytes: "body_bytes_sent"
referer: "http_referer"
user_agent: "http_user_agent"
- name: "nginx-error"
log_file: "/var/log/nginx/error.log"
enabled: true
buffer_size: 100
format:
name: "nginx_error"
pattern: '^(?P<timestamp>\d{4}/\d{2}/\d{2} \d{2}:\d{2}:\d{2}) \[(?P<level>\w+)\] (?P<pid>\d+)#(?P<tid>\d+): (?P<message>.*)'
fields:
timestamp: "time"
level: "log_level"
pid: "process_id"
tid: "thread_id"
message: "error_message"
- name: "nginx-tjm"
log_file: "/var/log/nginx/access_tjm.log"
enabled: true
buffer_size: 100
format:
name: "custom"

148
elasticsearch.go Normal file
View file

@ -0,0 +1,148 @@
package main
import (
"context"
"encoding/json"
"fmt"
"log/slog"
"strings"
"time"
"github.com/elastic/go-elasticsearch/v7"
)
func NewElasticsearchClient(config ElasticsearchConfig) (*elasticsearch.Client, error) {
esConfig := elasticsearch.Config{
Addresses: []string{config.URL},
}
if config.Username != "" && config.Password != "" {
esConfig.Username = config.Username
esConfig.Password = config.Password
}
client, err := elasticsearch.NewClient(esConfig)
if err != nil {
return nil, fmt.Errorf("failed to create elasticsearch client: %w", err)
}
return client, nil
}
func TestElasticsearchConnection(es *elasticsearch.Client) error {
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
res, err := es.Info(es.Info.WithContext(ctx))
if err != nil {
return fmt.Errorf("connection test failed: %w", err)
}
defer res.Body.Close()
if res.IsError() {
return fmt.Errorf("connection test failed: %s", res.String())
}
return nil
}
type ElasticsearchSender interface {
SendBatch(baseIndex string, entries []LogEntry) error
SendSystemMetrics(baseIndex string, metrics SystemResources) error
}
type ElasticsearchClient struct {
client *elasticsearch.Client
}
func NewElasticsearchSender(client *elasticsearch.Client) ElasticsearchSender {
return &ElasticsearchClient{client: client}
}
func (esc *ElasticsearchClient) SendBatch(baseIndex string, entries []LogEntry) error {
if len(entries) == 0 {
return nil
}
var body strings.Builder
for _, entry := range entries {
indexName := determineIndexName(baseIndex, entry)
indexLine := fmt.Sprintf(`{"index":{"_index":"%s"}}`, indexName)
body.WriteString(indexLine)
body.WriteString("\n")
data, err := json.Marshal(entry)
if err != nil {
slog.Error("error marshalling JSON", "error", err)
continue
}
body.WriteString(string(data))
body.WriteString("\n")
}
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
res, err := esc.client.Bulk(
strings.NewReader(body.String()),
esc.client.Bulk.WithContext(ctx),
)
if err != nil {
return fmt.Errorf("bulk request error: %w", err)
}
defer res.Body.Close()
if res.IsError() {
return fmt.Errorf("bulk request failed: %s", res.String())
}
slog.Debug("Batch successfully sent", "count", len(entries))
return nil
}
func (esc *ElasticsearchClient) SendSystemMetrics(baseIndex string, metrics SystemResources) error {
data, err := json.Marshal(metrics)
if err != nil {
return fmt.Errorf("JSON marshalling error: %w", err)
}
systemIndex := fmt.Sprintf("%s-system", baseIndex)
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
res, err := esc.client.Index(
systemIndex,
strings.NewReader(string(data)),
esc.client.Index.WithContext(ctx),
)
if err != nil {
return fmt.Errorf("elasticsearch index error: %w", err)
}
defer res.Body.Close()
if res.IsError() {
return fmt.Errorf("elasticsearch error: %s", res.String())
}
slog.Debug("System-Metrics sent",
"CPU", metrics.CPUPercent,
"MEM_used", metrics.MemoryUsed,
"MEM_total", metrics.MemoryTotal,
"MEM_percentage", metrics.MemoryPercent,
)
return nil
}
func determineIndexName(baseIndex string, entry LogEntry) string {
switch entry.Type {
case "system_metrics":
return fmt.Sprintf("%s-system", baseIndex)
case "service_log":
return fmt.Sprintf("%s-service-%s", baseIndex, entry.Service)
default:
return fmt.Sprintf("%s-%s", baseIndex, entry.Tool)
}
}

272
elasticsearch_exporter.go Normal file
View file

@ -0,0 +1,272 @@
package main
import (
"context"
"encoding/json"
"fmt"
"io"
"log/slog"
"strings"
"time"
"github.com/elastic/go-elasticsearch/v7"
)
type ElasticsearchExporter struct {
client *elasticsearch.Client
}
func NewElasticsearchExporter(client *elasticsearch.Client) *ElasticsearchExporter {
return &ElasticsearchExporter{
client: client,
}
}
type ExportResult struct {
Index string `json:"index"`
DocumentCount int `json:"document_count"`
StartTime time.Time `json:"start_time"`
EndTime time.Time `json:"end_time"`
Duration string `json:"duration"`
Error string `json:"error,omitempty"`
}
func (e *ElasticsearchExporter) ExportToStream(ctx context.Context, indices []string, batchSize int, writer io.Writer) error {
startTime := time.Now()
if _, err := writer.Write([]byte("{\n \"export_info\": {\n")); err != nil {
return fmt.Errorf("error writing export header: %w", err)
}
exportInfo := map[string]any{
"timestamp": startTime,
"indices": indices,
"batch_size": batchSize,
}
infoBytes, err := json.MarshalIndent(exportInfo, " ", " ")
if err != nil {
return fmt.Errorf("error marshalling export info: %w", err)
}
infoStr := string(infoBytes)
infoStr = strings.TrimPrefix(infoStr, "{")
infoStr = strings.TrimSuffix(infoStr, "}")
if _, err := writer.Write([]byte(infoStr)); err != nil {
return fmt.Errorf("error writing export info: %w", err)
}
if _, err := writer.Write([]byte("\n },\n \"data\": {\n")); err != nil {
return fmt.Errorf("error writing data header: %w", err)
}
results := make([]ExportResult, 0, len(indices))
first := true
for _, index := range indices {
if !first {
if _, err := writer.Write([]byte(",\n")); err != nil {
return fmt.Errorf("error writing separator: %w", err)
}
}
first = false
result := e.exportIndex(ctx, index, batchSize, writer)
results = append(results, result)
if result.Error != "" {
slog.Error("error exporting index", "index", index, "error", result.Error)
}
}
if _, err := writer.Write([]byte("\n },\n \"results\": ")); err != nil {
return fmt.Errorf("error writing results header: %w", err)
}
if err := json.NewEncoder(writer).Encode(results); err != nil {
return fmt.Errorf("error writing results: %w", err)
}
if _, err := writer.Write([]byte("}\n")); err != nil {
return fmt.Errorf("error writing final bracket: %w", err)
}
duration := time.Since(startTime)
slog.Info("Export completed", "duration", duration, "indices_count", len(indices))
return nil
}
func (e *ElasticsearchExporter) exportIndex(ctx context.Context, index string, batchSize int, writer io.Writer) ExportResult {
startTime := time.Now()
result := ExportResult{
Index: index,
StartTime: startTime,
}
if _, err := fmt.Fprintf(writer, " \"%s\": [\n", index); err != nil {
result.Error = fmt.Sprintf("error writing index header: %v", err)
result.EndTime = time.Now()
result.Duration = time.Since(startTime).String()
return result
}
query := `{"query":{"match_all":{}}}`
res, err := e.client.Search(
e.client.Search.WithContext(ctx),
e.client.Search.WithIndex(index),
e.client.Search.WithScroll(1000),
e.client.Search.WithSize(batchSize),
e.client.Search.WithBody(strings.NewReader(query)),
)
if err != nil {
result.Error = fmt.Sprintf("error in initial search: %v", err)
result.EndTime = time.Now()
result.Duration = time.Since(startTime).String()
return result
}
defer res.Body.Close()
if res.IsError() {
result.Error = fmt.Sprintf("elasticsearch error: %s", res.String())
result.EndTime = time.Now()
result.Duration = time.Since(startTime).String()
return result
}
var searchResult map[string]any
if err := json.NewDecoder(res.Body).Decode(&searchResult); err != nil {
result.Error = fmt.Sprintf("error decoding search result: %v", err)
result.EndTime = time.Now()
result.Duration = time.Since(startTime).String()
return result
}
scrollID, ok := searchResult["_scroll_id"].(string)
if !ok {
result.Error = "no scroll_id found in search result"
result.EndTime = time.Now()
result.Duration = time.Since(startTime).String()
return result
}
hits := searchResult["hits"].(map[string]any)["hits"].([]any)
firstDocument := true
documentCount := 0
for _, hit := range hits {
if !firstDocument {
if _, err := writer.Write([]byte(",\n")); err != nil {
result.Error = fmt.Sprintf("error writing separator: %v", err)
result.EndTime = time.Now()
result.Duration = time.Since(startTime).String()
return result
}
}
firstDocument = false
source := hit.(map[string]any)["_source"]
if err := e.writeDocument(writer, source); err != nil {
result.Error = fmt.Sprintf("error writing document: %v", err)
result.EndTime = time.Now()
result.Duration = time.Since(startTime).String()
return result
}
documentCount++
}
for {
select {
case <-ctx.Done():
result.Error = "context cancelled"
result.EndTime = time.Now()
result.Duration = time.Since(startTime).String()
return result
default:
}
scrollRes, err := e.client.Scroll(
e.client.Scroll.WithScrollID(scrollID),
e.client.Scroll.WithScroll(1000),
e.client.Scroll.WithContext(ctx),
)
if err != nil {
result.Error = fmt.Sprintf("error in scroll request: %v", err)
break
}
defer scrollRes.Body.Close()
if scrollRes.IsError() {
result.Error = fmt.Sprintf("elasticsearch scroll error: %s", scrollRes.String())
break
}
var scrollResult map[string]any
if err := json.NewDecoder(scrollRes.Body).Decode(&scrollResult); err != nil {
result.Error = fmt.Sprintf("error decoding scroll result: %v", err)
break
}
hits := scrollResult["hits"].(map[string]any)["hits"].([]any)
if len(hits) == 0 {
break
}
scrollID, _ = scrollResult["_scroll_id"].(string)
for _, hit := range hits {
if _, err := writer.Write([]byte(",\n")); err != nil {
result.Error = fmt.Sprintf("error writing separator: %v", err)
result.EndTime = time.Now()
result.Duration = time.Since(startTime).String()
return result
}
source := hit.(map[string]any)["_source"]
if err := e.writeDocument(writer, source); err != nil {
result.Error = fmt.Sprintf("error writing document: %v", err)
result.EndTime = time.Now()
result.Duration = time.Since(startTime).String()
return result
}
documentCount++
}
}
if _, err := writer.Write([]byte("\n ]")); err != nil {
if result.Error == "" {
result.Error = fmt.Sprintf("error writing index footer: %v", err)
}
}
result.DocumentCount = documentCount
result.EndTime = time.Now()
result.Duration = time.Since(startTime).String()
slog.Info("Index export completed",
"index", index,
"documents", documentCount,
"duration", result.Duration,
)
return result
}
func (e *ElasticsearchExporter) writeDocument(writer io.Writer, document any) error {
jsonBytes, err := json.MarshalIndent(document, " ", " ")
if err != nil {
return fmt.Errorf("error marshalling document: %w", err)
}
if _, err := writer.Write([]byte(" ")); err != nil {
return err
}
if _, err := writer.Write(jsonBytes); err != nil {
return err
}
return nil
}

203
file_monitor.go Normal file
View file

@ -0,0 +1,203 @@
package main
import (
"context"
"fmt"
"log/slog"
"regexp"
"strings"
"github.com/hpcloud/tail"
)
type FileMonitor struct {
config ToolConfig
parser LogParser
}
type LogParser interface {
Parse(line string, toolName string) LogEntry
}
func NewFileMonitor(config ToolConfig) *FileMonitor {
var parser LogParser
if config.Format.Pattern != "" {
pattern, err := regexp.Compile(config.Format.Pattern)
if err != nil {
slog.Error("invalid regex pattern", "tool", config.Name, "error", err)
parser = &DefaultLogParser{}
} else {
parser = &RegexLogParser{
pattern: pattern,
fields: config.Format.Fields,
}
}
} else {
switch config.Name {
case "nginx-tjm":
parser = &NginxTJMLogParser{}
default:
parser = &DefaultLogParser{}
}
}
return &FileMonitor{
config: config,
parser: parser,
}
}
func (fm *FileMonitor) Start(ctx context.Context, out chan<- LogEntry) error {
t, err := tail.TailFile(fm.config.LogFile, tail.Config{
Follow: true,
ReOpen: true,
MustExist: false,
Poll: true,
Location: &tail.SeekInfo{Offset: 0, Whence: 2},
})
if err != nil {
return fmt.Errorf("tail.TailFile: %w", err)
}
defer t.Stop()
slog.Debug("Started tailing file", "file", fm.config.LogFile, "tool", fm.config.Name)
for {
select {
case <-ctx.Done():
slog.Debug("File monitor stopped", "tool", fm.config.Name)
return nil
case line, ok := <-t.Lines:
if !ok {
return nil
}
if line.Err != nil {
slog.Error("error reading log file", "tool", fm.config.Name, "error", line.Err)
continue
}
if strings.TrimSpace(line.Text) == "" {
continue
}
entry := fm.parser.Parse(line.Text, fm.config.Name)
select {
case out <- entry:
case <-ctx.Done():
return nil
default:
slog.Warn("Log-Channel is full, entry dropped", "tool", fm.config.Name)
}
}
}
}
type DefaultLogParser struct{}
func (p *DefaultLogParser) Parse(line string, toolName string) LogEntry {
entry := NewLogEntry("log_entry")
entry.Tool = toolName
entry.Message = strings.TrimSpace(line)
entry.Raw = line
return entry
}
type RegexLogParser struct {
pattern *regexp.Regexp
fields map[string]string
}
func (p *RegexLogParser) Parse(line string, toolName string) LogEntry {
entry := NewLogEntry("log_entry")
entry.Tool = toolName
entry.Raw = line
fields := p.parseWithPattern(line)
if fields != nil {
entry.Fields = fields
} else {
entry.Message = strings.TrimSpace(line)
}
return entry
}
func (p *RegexLogParser) parseWithPattern(text string) map[string]any {
matches := p.pattern.FindStringSubmatch(text)
if matches == nil {
return nil
}
fields := make(map[string]any)
subexpNames := p.pattern.SubexpNames()
for i, match := range matches {
if i == 0 {
continue
}
if i < len(subexpNames) && subexpNames[i] != "" {
fieldName := subexpNames[i]
if mappedName, exists := p.fields[fieldName]; exists {
fieldName = mappedName
}
fields[fieldName] = match
}
}
return fields
}
type NginxTJMLogParser struct{}
func (p *NginxTJMLogParser) Parse(line string, toolName string) LogEntry {
entry := NewLogEntry("log_entry")
entry.Tool = toolName
entry.Raw = line
entry.Fields = p.parseNginxTJM(line)
return entry
}
func (p *NginxTJMLogParser) parseNginxTJM(text string) map[string]any {
parts := strings.Fields(text)
if len(parts) < 10 {
return map[string]any{
"raw": text,
}
}
fields := make(map[string]any)
if len(parts) > 0 {
timestamp := strings.Trim(parts[0], "[]")
fields["timestamp"] = timestamp
}
if len(parts) > 2 {
fields["client_ip"] = parts[2]
}
for i, part := range parts {
if strings.HasPrefix(part, "\"") {
if i+1 < len(parts) {
fields["method"] = strings.Trim(part, "\"")
fields["route"] = parts[i+1]
}
break
}
}
for _, part := range parts {
if after, ok := strings.CutPrefix(part, "status="); ok {
fields["status"] = after
break
}
}
return fields
}

33
go.mod Normal file
View file

@ -0,0 +1,33 @@
module tixel_elastic
go 1.24.1
require (
github.com/elastic/go-elasticsearch/v7 v7.17.10
github.com/hpcloud/tail v1.0.0
github.com/shirou/gopsutil v3.21.11+incompatible
github.com/spf13/viper v1.20.1
)
require (
github.com/fsnotify/fsnotify v1.8.0 // indirect
github.com/go-ole/go-ole v1.2.6 // indirect
github.com/go-viper/mapstructure/v2 v2.2.1 // indirect
github.com/pelletier/go-toml/v2 v2.2.3 // indirect
github.com/sagikazarmark/locafero v0.7.0 // indirect
github.com/sourcegraph/conc v0.3.0 // indirect
github.com/spf13/afero v1.12.0 // indirect
github.com/spf13/cast v1.7.1 // indirect
github.com/spf13/pflag v1.0.6 // indirect
github.com/subosito/gotenv v1.6.0 // indirect
github.com/tklauser/go-sysconf v0.3.15 // indirect
github.com/tklauser/numcpus v0.10.0 // indirect
github.com/yusufpapurcu/wmi v1.2.4 // indirect
go.uber.org/atomic v1.9.0 // indirect
go.uber.org/multierr v1.9.0 // indirect
golang.org/x/sys v0.31.0 // indirect
golang.org/x/text v0.21.0 // indirect
gopkg.in/fsnotify.v1 v1.4.7 // indirect
gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
)

71
go.sum Normal file
View file

@ -0,0 +1,71 @@
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/elastic/go-elasticsearch/v7 v7.17.10 h1:TCQ8i4PmIJuBunvBS6bwT2ybzVFxxUhhltAs3Gyu1yo=
github.com/elastic/go-elasticsearch/v7 v7.17.10/go.mod h1:OJ4wdbtDNk5g503kvlHLyErCgQwwzmDtaFC4XyOxXA4=
github.com/frankban/quicktest v1.14.6 h1:7Xjx+VpznH+oBnejlPUj8oUpdxnVs4f8XU8WnHkI4W8=
github.com/frankban/quicktest v1.14.6/go.mod h1:4ptaffx2x8+WTWXmUCuVU6aPUX1/Mz7zb5vbUoiM6w0=
github.com/fsnotify/fsnotify v1.8.0 h1:dAwr6QBTBZIkG8roQaJjGof0pp0EeF+tNV7YBP3F/8M=
github.com/fsnotify/fsnotify v1.8.0/go.mod h1:8jBTzvmWwFyi3Pb8djgCCO5IBqzKJ/Jwo8TRcHyHii0=
github.com/go-ole/go-ole v1.2.6 h1:/Fpf6oFPoeFik9ty7siob0G6Ke8QvQEuVcuChpwXzpY=
github.com/go-ole/go-ole v1.2.6/go.mod h1:pprOEPIfldk/42T2oK7lQ4v4JSDwmV0As9GaiUsvbm0=
github.com/go-viper/mapstructure/v2 v2.2.1 h1:ZAaOCxANMuZx5RCeg0mBdEZk7DZasvvZIxtHqx8aGss=
github.com/go-viper/mapstructure/v2 v2.2.1/go.mod h1:oJDH3BJKyqBA2TXFhDsKDGDTlndYOZ6rGS0BRZIxGhM=
github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI=
github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY=
github.com/hpcloud/tail v1.0.0 h1:nfCOvKYfkgYP8hkirhJocXT2+zOD8yUNjXaWfTlyFKI=
github.com/hpcloud/tail v1.0.0/go.mod h1:ab1qPbhIpdTxEkNHXyeSf5vhxWSCs/tWer42PpOxQnU=
github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE=
github.com/kr/pretty v0.3.1/go.mod h1:hoEshYVHaxMs3cyo3Yncou5ZscifuDolrwPKZanG3xk=
github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY=
github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE=
github.com/pelletier/go-toml/v2 v2.2.3 h1:YmeHyLY8mFWbdkNWwpr+qIL2bEqT0o95WSdkNHvL12M=
github.com/pelletier/go-toml/v2 v2.2.3/go.mod h1:MfCQTFTvCcUyyvvwm1+G6H/jORL20Xlb6rzQu9GuUkc=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/rogpeppe/go-internal v1.9.0 h1:73kH8U+JUqXU8lRuOHeVHaa/SZPifC7BkcraZVejAe8=
github.com/rogpeppe/go-internal v1.9.0/go.mod h1:WtVeX8xhTBvf0smdhujwtBcq4Qrzq/fJaraNFVN+nFs=
github.com/sagikazarmark/locafero v0.7.0 h1:5MqpDsTGNDhY8sGp0Aowyf0qKsPrhewaLSsFaodPcyo=
github.com/sagikazarmark/locafero v0.7.0/go.mod h1:2za3Cg5rMaTMoG/2Ulr9AwtFaIppKXTRYnozin4aB5k=
github.com/shirou/gopsutil v3.21.11+incompatible h1:+1+c1VGhc88SSonWP6foOcLhvnKlUeu/erjjvaPEYiI=
github.com/shirou/gopsutil v3.21.11+incompatible/go.mod h1:5b4v6he4MtMOwMlS0TUMTu2PcXUg8+E1lC7eC3UO/RA=
github.com/sourcegraph/conc v0.3.0 h1:OQTbbt6P72L20UqAkXXuLOj79LfEanQ+YQFNpLA9ySo=
github.com/sourcegraph/conc v0.3.0/go.mod h1:Sdozi7LEKbFPqYX2/J+iBAM6HpqSLTASQIKqDmF7Mt0=
github.com/spf13/afero v1.12.0 h1:UcOPyRBYczmFn6yvphxkn9ZEOY65cpwGKb5mL36mrqs=
github.com/spf13/afero v1.12.0/go.mod h1:ZTlWwG4/ahT8W7T0WQ5uYmjI9duaLQGy3Q2OAl4sk/4=
github.com/spf13/cast v1.7.1 h1:cuNEagBQEHWN1FnbGEjCXL2szYEXqfJPbP2HNUaca9Y=
github.com/spf13/cast v1.7.1/go.mod h1:ancEpBxwJDODSW/UG4rDrAqiKolqNNh2DX3mk86cAdo=
github.com/spf13/pflag v1.0.6 h1:jFzHGLGAlb3ruxLB8MhbI6A8+AQX/2eW4qeyNZXNp2o=
github.com/spf13/pflag v1.0.6/go.mod h1:McXfInJRrz4CZXVZOBLb0bTZqETkiAhM9Iw0y3An2Bg=
github.com/spf13/viper v1.20.1 h1:ZMi+z/lvLyPSCoNtFCpqjy0S4kPbirhpTMwl8BkW9X4=
github.com/spf13/viper v1.20.1/go.mod h1:P9Mdzt1zoHIG8m2eZQinpiBjo6kCmZSKBClNNqjJvu4=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI=
github.com/stretchr/testify v1.10.0 h1:Xv5erBjTwe/5IxqUQTdXv5kgmIvbHo3QQyRwhJsOfJA=
github.com/stretchr/testify v1.10.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY=
github.com/subosito/gotenv v1.6.0 h1:9NlTDc1FTs4qu0DDq7AEtTPNw6SVm7uBMsUCUjABIf8=
github.com/subosito/gotenv v1.6.0/go.mod h1:Dk4QP5c2W3ibzajGcXpNraDfq2IrhjMIvMSWPKKo0FU=
github.com/tklauser/go-sysconf v0.3.15 h1:VE89k0criAymJ/Os65CSn1IXaol+1wrsFHEB8Ol49K4=
github.com/tklauser/go-sysconf v0.3.15/go.mod h1:Dmjwr6tYFIseJw7a3dRLJfsHAMXZ3nEnL/aZY+0IuI4=
github.com/tklauser/numcpus v0.10.0 h1:18njr6LDBk1zuna922MgdjQuJFjrdppsZG60sHGfjso=
github.com/tklauser/numcpus v0.10.0/go.mod h1:BiTKazU708GQTYF4mB+cmlpT2Is1gLk7XVuEeem8LsQ=
github.com/yusufpapurcu/wmi v1.2.4 h1:zFUKzehAFReQwLys1b/iSMl+JQGSCSjtVqQn9bBrPo0=
github.com/yusufpapurcu/wmi v1.2.4/go.mod h1:SBZ9tNy3G9/m5Oi98Zks0QjeHVDvuK0qfxQmPyzfmi0=
go.uber.org/atomic v1.9.0 h1:ECmE8Bn/WFTYwEW/bpKD3M8VtR/zQVbavAoalC1PYyE=
go.uber.org/atomic v1.9.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc=
go.uber.org/multierr v1.9.0 h1:7fIwc/ZtS0q++VgcfqFDxSBZVv/Xo49/SYnDFupUwlI=
go.uber.org/multierr v1.9.0/go.mod h1:X2jQV1h+kxSjClGpnseKVIxpmcjrj7MNnI0bnlfKTVQ=
golang.org/x/sys v0.0.0-20190916202348-b4ddaad3f8a3/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.31.0 h1:ioabZlmFYtWhL+TRYpcnNlLwhyxaM9kWTDEmfnprqik=
golang.org/x/sys v0.31.0/go.mod h1:BJP2sWEmIv4KK5OTEluFJCKSidICx8ciO85XgH3Ak8k=
golang.org/x/text v0.21.0 h1:zyQAAkrwaneQ066sspRyJaG9VNi/YJ1NfzcGB3hZ/qo=
golang.org/x/text v0.21.0/go.mod h1:4IBbMaMmOPCJ8SecivzSH54+73PCFmPWxNTLm+vZkEQ=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15 h1:YR8cESwS4TdDjEe65xsg0ogRM/Nc3DYOhEAlW+xobZo=
gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/fsnotify.v1 v1.4.7 h1:xOHLXZwVvI9hhs+cLKq5+I5onOuwQLhQwiu63xxlHs4=
gopkg.in/fsnotify.v1 v1.4.7/go.mod h1:Tz8NjZHkW78fSQdbUxIjBTcgA1z1m8ZHf0WmKUhAMys=
gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7 h1:uRGJdciOHaEIrze2W8Q3AKkepLTh2hOroT7a+7czfdQ=
gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7/go.mod h1:dt/ZhP58zS4L8KSrWDmTeBkI65Dw0HsyUHuEVlX15mw=
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=

83
log_processor.go Normal file
View file

@ -0,0 +1,83 @@
package main
import (
"context"
"log/slog"
"time"
"github.com/elastic/go-elasticsearch/v7"
)
type LogProcessor struct {
sender ElasticsearchSender
baseIndex string
batchSize int
}
func NewLogProcessor(es *elasticsearch.Client, baseIndex string) *LogProcessor {
return &LogProcessor{
sender: NewElasticsearchSender(es),
baseIndex: baseIndex,
batchSize: 100,
}
}
func (lp *LogProcessor) Start(ctx context.Context, logChan <-chan LogEntry) {
batch := make([]LogEntry, 0, lp.batchSize)
ticker := time.NewTicker(5 * time.Second)
defer ticker.Stop()
slog.Info("Log processor started", "batch_size", lp.batchSize)
for {
select {
case <-ctx.Done():
if len(batch) > 0 {
lp.sendBatch(batch)
}
slog.Info("Log processor stopped")
return
case entry, ok := <-logChan:
if !ok {
if len(batch) > 0 {
lp.sendBatch(batch)
}
return
}
batch = append(batch, entry)
if len(batch) >= lp.batchSize {
lp.sendBatch(batch)
batch = batch[:0]
}
case <-ticker.C:
if len(batch) > 0 {
lp.sendBatch(batch)
batch = batch[:0]
}
}
}
}
func (lp *LogProcessor) sendBatch(batch []LogEntry) {
if len(batch) == 0 {
return
}
if err := lp.sender.SendBatch(lp.baseIndex, batch); err != nil {
slog.Error("error sending log batch", "error", err, "batch_size", len(batch))
return
}
slog.Debug("Log batch sent successfully", "batch_size", len(batch))
}
func (lp *LogProcessor) SetBatchSize(size int) {
if size > 0 {
lp.batchSize = size
slog.Info("Log processor batch size changed", "new_size", size)
}
}

137
main.go Normal file
View file

@ -0,0 +1,137 @@
package main
import (
"context"
"log/slog"
"os"
"os/signal"
"sync"
"syscall"
"time"
)
var hostname string
func init() {
var err error
hostname, err = os.Hostname()
if err != nil {
hostname = "unknown"
}
}
func main() {
cfg, err := LoadConfig()
if err != nil {
slog.Error("error loading configuration", "error", err)
os.Exit(1)
}
es, err := NewElasticsearchClient(cfg.Elasticsearch)
if err != nil {
slog.Error("elasticsearch client error", "error", err)
os.Exit(1)
}
if err := TestElasticsearchConnection(es); err != nil {
slog.Error("elasticsearch connection test failed", "error", err)
os.Exit(1)
}
slog.Info("TIXEL System Monitor started - Elasticsearch connection successful")
logChan := make(chan LogEntry, 1000)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
var wg sync.WaitGroup
for _, service := range cfg.Services {
if !service.Enabled {
slog.Info("Service deactivated, skipping...", "service", service.Name)
continue
}
wg.Add(1)
go func(s ServiceConfig) {
defer wg.Done()
monitor := NewServiceMonitor(s)
if err := monitor.Start(ctx, logChan); err != nil {
slog.Error("error watching service", "service", s.Name, "error", err)
}
}(service)
slog.Info("started watching Service-Log", "service", service.Name)
}
for _, tool := range cfg.Tools {
if !tool.Enabled {
slog.Info("Tool is deactivated, skipping...", "tool", tool.Name)
continue
}
wg.Add(1)
go func(t ToolConfig) {
defer wg.Done()
monitor := NewFileMonitor(t)
if err := monitor.Start(ctx, logChan); err != nil {
slog.Error("error watching", "tool", t.Name, "error", err)
}
}(tool)
slog.Info("started watching logs", "tool", tool.Name, "file", tool.LogFile)
}
if cfg.SystemMetrics.Enabled {
wg.Add(1)
go func() {
defer wg.Done()
collector := NewSystemMetricsCollector(cfg.SystemMetrics, cfg.PollIntervalSeconds)
collector.Start(ctx, es, cfg.Elasticsearch.Index)
}()
slog.Info("Started collecting System-Metrics")
}
wg.Add(1)
go func() {
defer wg.Done()
processor := NewLogProcessor(es, cfg.Elasticsearch.Index)
processor.Start(ctx, logChan)
}()
if cfg.WebService.Enabled {
wg.Add(1)
go func() {
defer wg.Done()
webService := NewWebService(cfg, es)
if err := webService.Start(ctx); err != nil {
slog.Error("web service error", "error", err)
}
}()
slog.Info("Web service started", "host", cfg.WebService.Host, "port", cfg.WebService.Port)
}
sigCh := make(chan os.Signal, 1)
signal.Notify(sigCh, os.Interrupt, syscall.SIGTERM)
<-sigCh
slog.Info("Shutdown-Signal received, stopping threads...")
cancel()
close(logChan)
done := make(chan struct{})
go func() {
wg.Wait()
close(done)
}()
select {
case <-done:
slog.Info("All threads closed")
case <-time.After(10 * time.Second):
slog.Info("Shutdown-Timeout reached, force quitting")
}
slog.Info("Program stopped")
}

68
models.go Normal file
View file

@ -0,0 +1,68 @@
package main
import (
"time"
)
type SystemResources struct {
Timestamp time.Time `json:"@timestamp"`
Type string `json:"type"`
Host string `json:"host"`
CPUPercent float64 `json:"cpu_percent,omitempty"`
MemoryUsed uint64 `json:"memory_used,omitempty"`
MemoryTotal uint64 `json:"memory_total,omitempty"`
MemoryPercent float64 `json:"memory_percent,omitempty"`
DiskUsage map[string]DiskUsage `json:"disk_usage,omitempty"`
NetworkStats map[string]NetworkStat `json:"network_stats,omitempty"`
LoadAverage []float64 `json:"load_average,omitempty"`
Uptime uint64 `json:"uptime,omitempty"`
}
type DiskUsage struct {
Used uint64 `json:"used"`
Total uint64 `json:"total"`
UsedPercent float64 `json:"used_percent"`
Free uint64 `json:"free"`
}
type NetworkStat struct {
BytesSent uint64 `json:"bytes_sent"`
BytesRecv uint64 `json:"bytes_recv"`
PacketsSent uint64 `json:"packets_sent"`
PacketsRecv uint64 `json:"packets_recv"`
}
type LogEntry struct {
Timestamp time.Time `json:"@timestamp"`
Type string `json:"type"`
Host string `json:"host"`
Tool string `json:"tool,omitempty"`
Service string `json:"service,omitempty"`
Message string `json:"message,omitempty"`
Fields map[string]any `json:"fields,omitempty"`
Raw string `json:"raw,omitempty"`
Priority string `json:"priority,omitempty"`
Unit string `json:"unit,omitempty"`
PID int `json:"pid,omitempty"`
BootID string `json:"boot_id,omitempty"`
MachineID string `json:"machine_id,omitempty"`
}
func NewLogEntry(entryType string) LogEntry {
return LogEntry{
Timestamp: time.Now(),
Type: entryType,
Host: hostname,
Fields: make(map[string]any),
}
}
func NewSystemResources() SystemResources {
return SystemResources{
Timestamp: time.Now(),
Type: "system_metrics",
Host: hostname,
DiskUsage: make(map[string]DiskUsage),
NetworkStats: make(map[string]NetworkStat),
}
}

310
service_monitor.go Normal file
View file

@ -0,0 +1,310 @@
package main
import (
"bufio"
"context"
"encoding/json"
"fmt"
"log/slog"
"os/exec"
"strconv"
"strings"
"time"
)
type ServiceMonitor struct {
config ServiceConfig
}
func NewServiceMonitor(config ServiceConfig) *ServiceMonitor {
return &ServiceMonitor{
config: config,
}
}
func (sm *ServiceMonitor) Start(ctx context.Context, out chan<- LogEntry) error {
args := sm.buildJournalctlArgs()
slog.Info("starting journalctl", "arguments", args)
cmd := exec.CommandContext(ctx, args[0], args[1:]...)
stdout, err := cmd.StdoutPipe()
if err != nil {
return fmt.Errorf("error StdoutPipe: %w", err)
}
if err := cmd.Start(); err != nil {
return fmt.Errorf("error start command: %w", err)
}
scanner := bufio.NewScanner(stdout)
go func() {
<-ctx.Done()
if cmd.Process != nil {
cmd.Process.Kill()
}
}()
parser := NewJournalEntryParser(sm.config.Name, sm.config.Service)
for scanner.Scan() {
select {
case <-ctx.Done():
return nil
default:
}
line := scanner.Text()
if strings.TrimSpace(line) == "" {
continue
}
entry, err := parser.Parse(line)
if err != nil {
slog.Error("error parsing journal entry", "service", sm.config.Name, "error", err)
continue
}
select {
case out <- entry:
case <-ctx.Done():
return nil
default:
slog.Warn("Service-Log-Channel is full, entry dropped", "service", sm.config.Name)
}
}
if err := scanner.Err(); err != nil {
return fmt.Errorf("scanner error: %w", err)
}
return cmd.Wait()
}
func (sm *ServiceMonitor) buildJournalctlArgs() []string {
args := []string{
"sudo",
"journalctl",
"-f",
"-o", "json",
"-u", sm.config.Service,
}
if sm.config.SinceTime != "" {
args = append(args, "--since", sm.config.SinceTime)
}
if sm.config.Priority != "" {
args = append(args, "-p", sm.config.Priority)
}
return args
}
type JournalEntryParser struct {
serviceName string
unitName string
}
func NewJournalEntryParser(serviceName, unitName string) *JournalEntryParser {
return &JournalEntryParser{
serviceName: serviceName,
unitName: unitName,
}
}
func (jep *JournalEntryParser) Parse(jsonLine string) (LogEntry, error) {
var journalData map[string]any
if err := json.Unmarshal([]byte(jsonLine), &journalData); err != nil {
return LogEntry{}, fmt.Errorf("JSON unmarshal error: %w", err)
}
entry := NewLogEntry("service_log")
entry.Service = jep.serviceName
entry.Unit = jep.unitName
if tsStr, ok := journalData["__REALTIME_TIMESTAMP"].(string); ok {
if tsInt, err := strconv.ParseInt(tsStr, 10, 64); err == nil {
entry.Timestamp = time.Unix(0, tsInt*1000)
}
}
if entry.Timestamp.IsZero() {
entry.Timestamp = time.Now()
}
if msg, ok := journalData["MESSAGE"].(string); ok {
entry.Message = msg
}
if priority, ok := journalData["PRIORITY"].(string); ok {
entry.Priority = priority
entry.Fields["priority_name"] = jep.getPriorityName(priority)
}
if pidStr, ok := journalData["_PID"].(string); ok {
if pid, err := strconv.Atoi(pidStr); err == nil {
entry.PID = pid
}
}
jep.extractSystemdFields(journalData, &entry)
if bootID, ok := journalData["_BOOT_ID"].(string); ok {
entry.BootID = bootID
}
if machineID, ok := journalData["_MACHINE_ID"].(string); ok {
entry.MachineID = machineID
}
entry.Raw = jsonLine
entry = jep.parseServiceSpecific(entry)
return entry, nil
}
func (jep *JournalEntryParser) getPriorityName(priority string) string {
priorityNames := map[string]string{
"0": "emergency",
"1": "alert",
"2": "critical",
"3": "error",
"4": "warning",
"5": "notice",
"6": "info",
"7": "debug",
}
if name, exists := priorityNames[priority]; exists {
return name
}
return "unknown"
}
func (jep *JournalEntryParser) extractSystemdFields(journalData map[string]any, entry *LogEntry) {
systemdFields := []string{
"_SYSTEMD_UNIT", "_SYSTEMD_USER_UNIT", "_SYSTEMD_SLICE",
"_BOOT_ID", "_MACHINE_ID", "_HOSTNAME", "_TRANSPORT",
"_CAP_EFFECTIVE", "_SELINUX_CONTEXT", "_AUDIT_SESSION",
"_AUDIT_LOGINUID", "_GID", "_UID", "_COMM", "_EXE",
"_CMDLINE", "_SYSTEMD_CGROUP", "_SYSTEMD_SESSION",
"_SYSTEMD_OWNER_UID", "_SOURCE_REALTIME_TIMESTAMP",
}
for _, field := range systemdFields {
if value, ok := journalData[field]; ok {
esFieldName := strings.ToLower(strings.TrimPrefix(field, "_"))
entry.Fields[esFieldName] = value
}
}
}
func (jep *JournalEntryParser) parseServiceSpecific(entry LogEntry) LogEntry {
switch jep.serviceName {
case "tixstream":
return parseTixstreamService(entry)
case "transfer-job-manager":
return parseTJMService(entry)
case "nginx":
return parseNginxService(entry)
default:
return entry
}
}
func parseTixstreamService(entry LogEntry) LogEntry {
newEntry := entry
msg := strings.ReplaceAll(entry.Message, " ", " ")
parts := strings.Fields(msg)
if len(parts) < 5 {
return newEntry
}
logLevel := parts[0]
timestampDate := parts[1]
timestampTime := parts[2]
transferID := parts[3]
info := parts[4:]
if newEntry.Fields == nil {
newEntry.Fields = make(map[string]any)
}
newEntry.Fields["log_level"] = logLevel
newEntry.Fields["message_date"] = timestampDate
newEntry.Fields["message_time"] = timestampTime
newEntry.Fields["transfer_id"] = transferID
newEntry.Fields["log_message"] = strings.Join(info, " ")
if info != nil {
var transferDirection string
var transferInfo []string
var queueStats []string
var logType string
switch info[0] {
case "in:":
logType = "direction_info"
transferDirection = "incoming"
transferInfo = info[1:]
case "out:":
logType = "direction_info"
transferDirection = "outgoing"
transferInfo = info[1:]
case "queue-stats:":
logType = "queue_stats"
queueStats = info[1:]
case "transfer:":
logType = "transfer_info"
transferInfo = info[1:]
default:
logType = "log_message"
transferDirection = ""
transferInfo = info
}
if logType != "" {
newEntry.Fields["log_type"] = logType
}
if transferDirection != "" {
newEntry.Fields["transfer_direction"] = transferDirection
}
if transferInfo != nil {
newEntry.Fields["transfer_info"] = strings.Join(transferInfo, ";")
}
if queueStats != nil {
newEntry.Fields["queue_stats"] = strings.Join(queueStats, ";")
}
}
return newEntry
}
func parseTJMService(entry LogEntry) LogEntry {
newEntry := entry
msg := strings.ReplaceAll(entry.Message, " ", " ")
msg = strings.ReplaceAll(msg, "---", "")
msg = strings.ReplaceAll(msg, " ", " ")
parts := strings.Fields(msg)
if len(parts) < 4 {
return newEntry
}
timestampDate := parts[0]
timestampTime := parts[1]
logLevel := parts[2]
info := parts[3:]
if newEntry.Fields == nil {
newEntry.Fields = make(map[string]any)
}
newEntry.Fields["log_level"] = logLevel
newEntry.Fields["message_date"] = timestampDate
newEntry.Fields["message_time"] = timestampTime
newEntry.Fields["message"] = strings.Join(info, " ")
return newEntry
}
func parseNginxService(entry LogEntry) LogEntry {
return entry
}

154
system_metrics.go Normal file
View file

@ -0,0 +1,154 @@
package main
import (
"context"
"fmt"
"log/slog"
"slices"
"time"
"github.com/elastic/go-elasticsearch/v7"
"github.com/shirou/gopsutil/cpu"
"github.com/shirou/gopsutil/disk"
"github.com/shirou/gopsutil/host"
"github.com/shirou/gopsutil/mem"
"github.com/shirou/gopsutil/net"
)
type SystemMetricsCollector struct {
config SystemMetrics
pollInterval int
}
func NewSystemMetricsCollector(config SystemMetrics, pollInterval int) *SystemMetricsCollector {
return &SystemMetricsCollector{
config: config,
pollInterval: pollInterval,
}
}
func (smc *SystemMetricsCollector) Start(ctx context.Context, es *elasticsearch.Client, baseIndex string) {
ticker := time.NewTicker(time.Duration(smc.pollInterval) * time.Second)
defer ticker.Stop()
sender := NewElasticsearchSender(es)
for {
select {
case <-ctx.Done():
slog.Info("System metrics collector stopped")
return
case <-ticker.C:
metrics, err := smc.collectMetrics()
if err != nil {
slog.Error("error collecting system metrics", "error", err)
continue
}
if err := sender.SendSystemMetrics(baseIndex, metrics); err != nil {
slog.Error("error sending system metrics", "error", err)
}
}
}
}
func (smc *SystemMetricsCollector) collectMetrics() (SystemResources, error) {
result := NewSystemResources()
var err error
if smc.config.CollectCPU {
if err = smc.collectCPUMetrics(&result); err != nil {
return result, fmt.Errorf("CPU metrics: %w", err)
}
}
if smc.config.CollectMemory {
if err = smc.collectMemoryMetrics(&result); err != nil {
return result, fmt.Errorf("memory metrics: %w", err)
}
}
if smc.config.CollectDisk {
if err = smc.collectDiskMetrics(&result); err != nil {
return result, fmt.Errorf("disk metrics: %w", err)
}
}
if smc.config.CollectNetwork {
if err = smc.collectNetworkMetrics(&result); err != nil {
return result, fmt.Errorf("network metrics: %w", err)
}
}
return result, nil
}
func (smc *SystemMetricsCollector) collectCPUMetrics(result *SystemResources) error {
cpuPercents, err := cpu.Percent(time.Second, false)
if err != nil {
return err
}
if len(cpuPercents) > 0 {
result.CPUPercent = cpuPercents[0]
}
if hostStat, err := host.Info(); err == nil {
result.Uptime = hostStat.Uptime
}
return nil
}
func (smc *SystemMetricsCollector) collectMemoryMetrics(result *SystemResources) error {
vmStat, err := mem.VirtualMemory()
if err != nil {
return err
}
result.MemoryUsed = vmStat.Used
result.MemoryTotal = vmStat.Total
result.MemoryPercent = vmStat.UsedPercent
return nil
}
func (smc *SystemMetricsCollector) collectDiskMetrics(result *SystemResources) error {
for _, path := range smc.config.DiskPaths {
diskStat, err := disk.Usage(path)
if err != nil {
slog.Error("error reading disk stats", "path", path, "error", err)
continue
}
result.DiskUsage[path] = DiskUsage{
Used: diskStat.Used,
Total: diskStat.Total,
UsedPercent: diskStat.UsedPercent,
Free: diskStat.Free,
}
}
return nil
}
func (smc *SystemMetricsCollector) collectNetworkMetrics(result *SystemResources) error {
netStats, err := net.IOCounters(true)
if err != nil {
return err
}
for _, stat := range netStats {
if len(smc.config.NetworkInterfaces) == 0 || slices.Contains(smc.config.NetworkInterfaces, stat.Name) {
result.NetworkStats[stat.Name] = NetworkStat{
BytesSent: stat.BytesSent,
BytesRecv: stat.BytesRecv,
PacketsSent: stat.PacketsSent,
PacketsRecv: stat.PacketsRecv,
}
}
}
return nil
}

202
web_service.go Normal file
View file

@ -0,0 +1,202 @@
package main
import (
"context"
"encoding/json"
"fmt"
"log/slog"
"net/http"
"strconv"
"strings"
"time"
"github.com/elastic/go-elasticsearch/v7"
)
type WebService struct {
server *http.Server
esClient *elasticsearch.Client
config *Config
}
func NewWebService(config *Config, esClient *elasticsearch.Client) *WebService {
mux := http.NewServeMux()
ws := &WebService{
esClient: esClient,
config: config,
}
mux.HandleFunc("/export", ws.handleExport)
mux.HandleFunc("/health", ws.handleHealth)
mux.HandleFunc("/indices", ws.handleIndices)
addr := fmt.Sprintf("%s:%d", config.WebService.Host, config.WebService.Port)
ws.server = &http.Server{
Addr: addr,
Handler: mux,
ReadTimeout: 30 * time.Second,
WriteTimeout: 300 * time.Second,
IdleTimeout: 60 * time.Second,
}
return ws
}
func (ws *WebService) Start(ctx context.Context) error {
go func() {
<-ctx.Done()
shutdownCtx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
if err := ws.server.Shutdown(shutdownCtx); err != nil {
slog.Error("web service shutdown error", "error", err)
}
}()
slog.Info("Starting web service", "address", ws.server.Addr)
if err := ws.server.ListenAndServe(); err != http.ErrServerClosed {
return fmt.Errorf("web service error: %w", err)
}
return nil
}
func (ws *WebService) handleExport(w http.ResponseWriter, r *http.Request) {
if r.Method != http.MethodGet {
http.Error(w, "Method not allowed", http.StatusMethodNotAllowed)
return
}
indices := ws.parseIndicesParam(r)
if len(indices) == 0 {
http.Error(w, "No indices specified. Use ?indices=index1,index2", http.StatusBadRequest)
return
}
size := ws.parseSizeParam(r)
slog.Info("Export request received", "indices", indices, "size", size)
w.Header().Set("Content-Type", "application/json")
w.Header().Set("Content-Disposition", "attachment; filename=elasticsearch_export.json")
exporter := NewElasticsearchExporter(ws.esClient)
if err := exporter.ExportToStream(r.Context(), indices, size, w); err != nil {
slog.Error("export error", "error", err)
http.Error(w, fmt.Sprintf("Export error: %v", err), http.StatusInternalServerError)
return
}
slog.Info("Export completed successfully", "indices", indices)
}
func (ws *WebService) handleHealth(w http.ResponseWriter, r *http.Request) {
if r.Method != http.MethodGet {
http.Error(w, "Method not allowed", http.StatusMethodNotAllowed)
return
}
ctx, cancel := context.WithTimeout(r.Context(), 5*time.Second)
defer cancel()
res, err := ws.esClient.Info(ws.esClient.Info.WithContext(ctx))
if err != nil {
w.WriteHeader(http.StatusServiceUnavailable)
json.NewEncoder(w).Encode(map[string]any{
"status": "unhealthy",
"error": err.Error(),
})
return
}
res.Body.Close()
if res.IsError() {
w.WriteHeader(http.StatusServiceUnavailable)
json.NewEncoder(w).Encode(map[string]any{
"status": "unhealthy",
"error": res.String(),
})
return
}
w.Header().Set("Content-Type", "application/json")
json.NewEncoder(w).Encode(map[string]any{
"status": "healthy",
"timestamp": time.Now(),
})
}
func (ws *WebService) handleIndices(w http.ResponseWriter, r *http.Request) {
if r.Method != http.MethodGet {
http.Error(w, "Method not allowed", http.StatusMethodNotAllowed)
return
}
ctx, cancel := context.WithTimeout(r.Context(), 10*time.Second)
defer cancel()
res, err := ws.esClient.Cat.Indices(
ws.esClient.Cat.Indices.WithContext(ctx),
ws.esClient.Cat.Indices.WithFormat("json"),
)
if err != nil {
http.Error(w, fmt.Sprintf("Error fetching indices: %v", err), http.StatusInternalServerError)
return
}
defer res.Body.Close()
if res.IsError() {
http.Error(w, fmt.Sprintf("Elasticsearch error: %s", res.String()), http.StatusInternalServerError)
return
}
var indices []map[string]any
if err := json.NewDecoder(res.Body).Decode(&indices); err != nil {
http.Error(w, fmt.Sprintf("Error decoding response: %v", err), http.StatusInternalServerError)
return
}
w.Header().Set("Content-Type", "application/json")
json.NewEncoder(w).Encode(map[string]any{
"indices": indices,
"count": len(indices),
})
}
func (ws *WebService) parseIndicesParam(r *http.Request) []string {
indicesParam := r.URL.Query().Get("indices")
if indicesParam == "" {
return nil
}
indices := strings.Split(indicesParam, ",")
var result []string
for _, index := range indices {
index = strings.TrimSpace(index)
if index != "" {
result = append(result, index)
}
}
return result
}
func (ws *WebService) parseSizeParam(r *http.Request) int {
sizeParam := r.URL.Query().Get("size")
if sizeParam == "" {
return 1000
}
size, err := strconv.Atoi(sizeParam)
if err != nil || size <= 0 {
return 1000
}
if size > 10000 {
size = 10000
}
return size
}