commit 3af58534210d1474bb7560fef8a31076f628f608 Author: Patryk Hegenberg Date: Mon Sep 1 11:14:53 2025 +0200 tixel-elastic: initial commit with first prototype version for discussion diff --git a/config.go b/config.go new file mode 100644 index 0000000..63fab47 --- /dev/null +++ b/config.go @@ -0,0 +1,129 @@ +package main + +import ( + "fmt" + "log" + + "github.com/spf13/viper" +) + +type WebConfig struct { + Enabled bool `mapstructure:"enabled"` + Port int `mapstructure:"port"` + Host string `mapstructure:"host"` +} + +type LogFormat struct { + Name string `mapstructure:"name"` + Pattern string `mapstructure:"pattern"` + Fields map[string]string `mapstructure:"fields"` +} + +type ToolConfig struct { + Name string `mapstructure:"name"` + LogFile string `mapstructure:"log_file"` + Format LogFormat `mapstructure:"format"` + Enabled bool `mapstructure:"enabled"` + BufferSize int `mapstructure:"buffer_size"` +} + +type ServiceConfig struct { + Name string `mapstructure:"name"` + Service string `mapstructure:"service"` + Enabled bool `mapstructure:"enabled"` + SinceTime string `mapstructure:"since_time"` + Priority string `mapstructure:"priority"` +} + +type ElasticsearchConfig struct { + URL string `mapstructure:"url"` + Index string `mapstructure:"index"` + Username string `mapstructure:"username"` + Password string `mapstructure:"password"` + Timeout int `mapstructure:"timeout"` +} + +type SystemMetrics struct { + Enabled bool `mapstructure:"enabled"` + CollectCPU bool `mapstructure:"collect_cpu"` + CollectMemory bool `mapstructure:"collect_memory"` + CollectDisk bool `mapstructure:"collect_disk"` + CollectNetwork bool `mapstructure:"collect_network"` + DiskPaths []string `mapstructure:"disk_paths"` + NetworkInterfaces []string `mapstructure:"network_interfaces"` +} + +type Config struct { + Elasticsearch ElasticsearchConfig `mapstructure:"elasticsearch"` + Tools []ToolConfig `mapstructure:"tools"` + Services []ServiceConfig `mapstructure:"services"` + PollIntervalSeconds int `mapstructure:"poll_interval_seconds"` + SystemMetrics SystemMetrics `mapstructure:"system_metrics"` + WebService WebConfig `mapstructure:"web_service"` + Logging struct { + Level string `mapstructure:"level"` + FilePath string `mapstructure:"file_path"` + } `mapstructure:"logging"` +} + +func LoadConfig() (*Config, error) { + viper.SetConfigName("config") + viper.AddConfigPath(".") + viper.AddConfigPath("/etc/tixel-watch/") + viper.SetConfigType("yaml") + + setConfigDefaults() + + if err := viper.ReadInConfig(); err != nil { + return nil, fmt.Errorf("error reading config: %w", err) + } + + var cfg Config + if err := viper.Unmarshal(&cfg); err != nil { + return nil, fmt.Errorf("error parsing config: %w", err) + } + + if err := validateConfig(&cfg); err != nil { + return nil, fmt.Errorf("config validation failed: %w", err) + } + + return &cfg, nil +} + +func setConfigDefaults() { + viper.SetDefault("poll_interval_seconds", 30) + viper.SetDefault("elasticsearch.timeout", 30) + viper.SetDefault("system_metrics.enabled", true) + viper.SetDefault("system_metrics.collect_cpu", true) + viper.SetDefault("system_metrics.collect_memory", true) + viper.SetDefault("system_metrics.collect_disk", true) + viper.SetDefault("system_metrics.collect_network", false) + viper.SetDefault("system_metrics.disk_paths", []string{"/"}) + viper.SetDefault("web_service.enabled", false) + viper.SetDefault("web_service.port", 8080) + viper.SetDefault("web_service.host", "localhost") + viper.SetDefault("logging.level", "info") +} + +func validateConfig(cfg *Config) error { + if cfg.Elasticsearch.URL == "" { + return fmt.Errorf("elasticsearch.url ist erforderlich") + } + + if cfg.Elasticsearch.Index == "" { + return fmt.Errorf("elasticsearch.index ist erforderlich") + } + + if cfg.PollIntervalSeconds <= 0 { + log.Printf("Warnung: poll_interval_seconds ist %d, setze auf 30", cfg.PollIntervalSeconds) + cfg.PollIntervalSeconds = 30 + } + + for i := range cfg.Tools { + if cfg.Tools[i].BufferSize <= 0 { + cfg.Tools[i].BufferSize = 100 + } + } + + return nil +} diff --git a/config.yaml b/config.yaml new file mode 100644 index 0000000..6190a5f --- /dev/null +++ b/config.yaml @@ -0,0 +1,90 @@ +elasticsearch: + url: "http://localhost:9200" + index: "tixel" + username: "elastic" + password: "79QQ4JGTa3R_nkqA=MxW" + timeout: 30 + +web_service: + enabled: true + host: "localhost" + port: 8080 + +system_metrics: + enabled: true + collect_cpu: true + collect_memory: true + collect_disk: true + collect_network: false + disk_paths: + - "/" + - "/var" + - "/home" + network_interfaces: + - "eth0" + - "wlan0" + +poll_interval_seconds: 30 + +logging: + level: "info" + file_path: "/var/log/system-monitor.log" + +services: + - name: "nginx" + service: "nginx.service" + enabled: true + since_time: "" + priority: "info" + + - name: "tixstream" + service: "tixstream.service" + enabled: true + since_time: "" + priority: "debug" + + - name: "transfer-job-manager" + service: "transfer-job-manager.service" + enabled: true + since_time: "" + priority: "debug" + +tools: + - name: "nginx-access" + log_file: "/var/log/nginx/access.log" + enabled: true + buffer_size: 200 + format: + name: "nginx_combined" + pattern: '^(?P\S+) \S+ \S+ \[(?P[^\]]+)\] "(?P\S+) (?P\S+) (?P\S+)" (?P\d+) (?P\d+) "(?P[^"]*)" "(?P[^"]*)"' + fields: + client_ip: "remote_addr" + timestamp: "time_local" + method: "request_method" + path: "request_uri" + protocol: "server_protocol" + status: "status" + body_bytes: "body_bytes_sent" + referer: "http_referer" + user_agent: "http_user_agent" + + - name: "nginx-error" + log_file: "/var/log/nginx/error.log" + enabled: true + buffer_size: 100 + format: + name: "nginx_error" + pattern: '^(?P\d{4}/\d{2}/\d{2} \d{2}:\d{2}:\d{2}) \[(?P\w+)\] (?P\d+)#(?P\d+): (?P.*)' + fields: + timestamp: "time" + level: "log_level" + pid: "process_id" + tid: "thread_id" + message: "error_message" + + - name: "nginx-tjm" + log_file: "/var/log/nginx/access_tjm.log" + enabled: true + buffer_size: 100 + format: + name: "custom" diff --git a/elasticsearch.go b/elasticsearch.go new file mode 100644 index 0000000..9bc8ecd --- /dev/null +++ b/elasticsearch.go @@ -0,0 +1,148 @@ +package main + +import ( + "context" + "encoding/json" + "fmt" + "log/slog" + "strings" + "time" + + "github.com/elastic/go-elasticsearch/v7" +) + +func NewElasticsearchClient(config ElasticsearchConfig) (*elasticsearch.Client, error) { + esConfig := elasticsearch.Config{ + Addresses: []string{config.URL}, + } + + if config.Username != "" && config.Password != "" { + esConfig.Username = config.Username + esConfig.Password = config.Password + } + + client, err := elasticsearch.NewClient(esConfig) + if err != nil { + return nil, fmt.Errorf("failed to create elasticsearch client: %w", err) + } + + return client, nil +} + +func TestElasticsearchConnection(es *elasticsearch.Client) error { + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + + res, err := es.Info(es.Info.WithContext(ctx)) + if err != nil { + return fmt.Errorf("connection test failed: %w", err) + } + defer res.Body.Close() + + if res.IsError() { + return fmt.Errorf("connection test failed: %s", res.String()) + } + + return nil +} + +type ElasticsearchSender interface { + SendBatch(baseIndex string, entries []LogEntry) error + SendSystemMetrics(baseIndex string, metrics SystemResources) error +} + +type ElasticsearchClient struct { + client *elasticsearch.Client +} + +func NewElasticsearchSender(client *elasticsearch.Client) ElasticsearchSender { + return &ElasticsearchClient{client: client} +} + +func (esc *ElasticsearchClient) SendBatch(baseIndex string, entries []LogEntry) error { + if len(entries) == 0 { + return nil + } + + var body strings.Builder + for _, entry := range entries { + indexName := determineIndexName(baseIndex, entry) + + indexLine := fmt.Sprintf(`{"index":{"_index":"%s"}}`, indexName) + body.WriteString(indexLine) + body.WriteString("\n") + + data, err := json.Marshal(entry) + if err != nil { + slog.Error("error marshalling JSON", "error", err) + continue + } + body.WriteString(string(data)) + body.WriteString("\n") + } + + ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) + defer cancel() + + res, err := esc.client.Bulk( + strings.NewReader(body.String()), + esc.client.Bulk.WithContext(ctx), + ) + if err != nil { + return fmt.Errorf("bulk request error: %w", err) + } + defer res.Body.Close() + + if res.IsError() { + return fmt.Errorf("bulk request failed: %s", res.String()) + } + + slog.Debug("Batch successfully sent", "count", len(entries)) + return nil +} + +func (esc *ElasticsearchClient) SendSystemMetrics(baseIndex string, metrics SystemResources) error { + data, err := json.Marshal(metrics) + if err != nil { + return fmt.Errorf("JSON marshalling error: %w", err) + } + + systemIndex := fmt.Sprintf("%s-system", baseIndex) + + ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) + defer cancel() + + res, err := esc.client.Index( + systemIndex, + strings.NewReader(string(data)), + esc.client.Index.WithContext(ctx), + ) + if err != nil { + return fmt.Errorf("elasticsearch index error: %w", err) + } + defer res.Body.Close() + + if res.IsError() { + return fmt.Errorf("elasticsearch error: %s", res.String()) + } + + slog.Debug("System-Metrics sent", + "CPU", metrics.CPUPercent, + "MEM_used", metrics.MemoryUsed, + "MEM_total", metrics.MemoryTotal, + "MEM_percentage", metrics.MemoryPercent, + ) + + return nil +} + +func determineIndexName(baseIndex string, entry LogEntry) string { + switch entry.Type { + case "system_metrics": + return fmt.Sprintf("%s-system", baseIndex) + case "service_log": + return fmt.Sprintf("%s-service-%s", baseIndex, entry.Service) + default: + return fmt.Sprintf("%s-%s", baseIndex, entry.Tool) + } +} diff --git a/elasticsearch_exporter.go b/elasticsearch_exporter.go new file mode 100644 index 0000000..d1f003b --- /dev/null +++ b/elasticsearch_exporter.go @@ -0,0 +1,272 @@ +package main + +import ( + "context" + "encoding/json" + "fmt" + "io" + "log/slog" + "strings" + "time" + + "github.com/elastic/go-elasticsearch/v7" +) + +type ElasticsearchExporter struct { + client *elasticsearch.Client +} + +func NewElasticsearchExporter(client *elasticsearch.Client) *ElasticsearchExporter { + return &ElasticsearchExporter{ + client: client, + } +} + +type ExportResult struct { + Index string `json:"index"` + DocumentCount int `json:"document_count"` + StartTime time.Time `json:"start_time"` + EndTime time.Time `json:"end_time"` + Duration string `json:"duration"` + Error string `json:"error,omitempty"` +} + +func (e *ElasticsearchExporter) ExportToStream(ctx context.Context, indices []string, batchSize int, writer io.Writer) error { + startTime := time.Now() + + if _, err := writer.Write([]byte("{\n \"export_info\": {\n")); err != nil { + return fmt.Errorf("error writing export header: %w", err) + } + + exportInfo := map[string]any{ + "timestamp": startTime, + "indices": indices, + "batch_size": batchSize, + } + + infoBytes, err := json.MarshalIndent(exportInfo, " ", " ") + if err != nil { + return fmt.Errorf("error marshalling export info: %w", err) + } + + infoStr := string(infoBytes) + infoStr = strings.TrimPrefix(infoStr, "{") + infoStr = strings.TrimSuffix(infoStr, "}") + + if _, err := writer.Write([]byte(infoStr)); err != nil { + return fmt.Errorf("error writing export info: %w", err) + } + + if _, err := writer.Write([]byte("\n },\n \"data\": {\n")); err != nil { + return fmt.Errorf("error writing data header: %w", err) + } + + results := make([]ExportResult, 0, len(indices)) + first := true + + for _, index := range indices { + if !first { + if _, err := writer.Write([]byte(",\n")); err != nil { + return fmt.Errorf("error writing separator: %w", err) + } + } + first = false + + result := e.exportIndex(ctx, index, batchSize, writer) + results = append(results, result) + + if result.Error != "" { + slog.Error("error exporting index", "index", index, "error", result.Error) + } + } + + if _, err := writer.Write([]byte("\n },\n \"results\": ")); err != nil { + return fmt.Errorf("error writing results header: %w", err) + } + + if err := json.NewEncoder(writer).Encode(results); err != nil { + return fmt.Errorf("error writing results: %w", err) + } + + if _, err := writer.Write([]byte("}\n")); err != nil { + return fmt.Errorf("error writing final bracket: %w", err) + } + + duration := time.Since(startTime) + slog.Info("Export completed", "duration", duration, "indices_count", len(indices)) + + return nil +} + +func (e *ElasticsearchExporter) exportIndex(ctx context.Context, index string, batchSize int, writer io.Writer) ExportResult { + startTime := time.Now() + result := ExportResult{ + Index: index, + StartTime: startTime, + } + + if _, err := fmt.Fprintf(writer, " \"%s\": [\n", index); err != nil { + result.Error = fmt.Sprintf("error writing index header: %v", err) + result.EndTime = time.Now() + result.Duration = time.Since(startTime).String() + return result + } + + query := `{"query":{"match_all":{}}}` + + res, err := e.client.Search( + e.client.Search.WithContext(ctx), + e.client.Search.WithIndex(index), + e.client.Search.WithScroll(1000), + e.client.Search.WithSize(batchSize), + e.client.Search.WithBody(strings.NewReader(query)), + ) + if err != nil { + result.Error = fmt.Sprintf("error in initial search: %v", err) + result.EndTime = time.Now() + result.Duration = time.Since(startTime).String() + return result + } + defer res.Body.Close() + + if res.IsError() { + result.Error = fmt.Sprintf("elasticsearch error: %s", res.String()) + result.EndTime = time.Now() + result.Duration = time.Since(startTime).String() + return result + } + + var searchResult map[string]any + if err := json.NewDecoder(res.Body).Decode(&searchResult); err != nil { + result.Error = fmt.Sprintf("error decoding search result: %v", err) + result.EndTime = time.Now() + result.Duration = time.Since(startTime).String() + return result + } + + scrollID, ok := searchResult["_scroll_id"].(string) + if !ok { + result.Error = "no scroll_id found in search result" + result.EndTime = time.Now() + result.Duration = time.Since(startTime).String() + return result + } + + hits := searchResult["hits"].(map[string]any)["hits"].([]any) + firstDocument := true + documentCount := 0 + + for _, hit := range hits { + if !firstDocument { + if _, err := writer.Write([]byte(",\n")); err != nil { + result.Error = fmt.Sprintf("error writing separator: %v", err) + result.EndTime = time.Now() + result.Duration = time.Since(startTime).String() + return result + } + } + firstDocument = false + + source := hit.(map[string]any)["_source"] + if err := e.writeDocument(writer, source); err != nil { + result.Error = fmt.Sprintf("error writing document: %v", err) + result.EndTime = time.Now() + result.Duration = time.Since(startTime).String() + return result + } + documentCount++ + } + + for { + select { + case <-ctx.Done(): + result.Error = "context cancelled" + result.EndTime = time.Now() + result.Duration = time.Since(startTime).String() + return result + default: + } + + scrollRes, err := e.client.Scroll( + e.client.Scroll.WithScrollID(scrollID), + e.client.Scroll.WithScroll(1000), + e.client.Scroll.WithContext(ctx), + ) + if err != nil { + result.Error = fmt.Sprintf("error in scroll request: %v", err) + break + } + defer scrollRes.Body.Close() + + if scrollRes.IsError() { + result.Error = fmt.Sprintf("elasticsearch scroll error: %s", scrollRes.String()) + break + } + + var scrollResult map[string]any + if err := json.NewDecoder(scrollRes.Body).Decode(&scrollResult); err != nil { + result.Error = fmt.Sprintf("error decoding scroll result: %v", err) + break + } + + hits := scrollResult["hits"].(map[string]any)["hits"].([]any) + if len(hits) == 0 { + break + } + + scrollID, _ = scrollResult["_scroll_id"].(string) + + for _, hit := range hits { + if _, err := writer.Write([]byte(",\n")); err != nil { + result.Error = fmt.Sprintf("error writing separator: %v", err) + result.EndTime = time.Now() + result.Duration = time.Since(startTime).String() + return result + } + + source := hit.(map[string]any)["_source"] + if err := e.writeDocument(writer, source); err != nil { + result.Error = fmt.Sprintf("error writing document: %v", err) + result.EndTime = time.Now() + result.Duration = time.Since(startTime).String() + return result + } + documentCount++ + } + } + + if _, err := writer.Write([]byte("\n ]")); err != nil { + if result.Error == "" { + result.Error = fmt.Sprintf("error writing index footer: %v", err) + } + } + + result.DocumentCount = documentCount + result.EndTime = time.Now() + result.Duration = time.Since(startTime).String() + + slog.Info("Index export completed", + "index", index, + "documents", documentCount, + "duration", result.Duration, + ) + + return result +} + +func (e *ElasticsearchExporter) writeDocument(writer io.Writer, document any) error { + jsonBytes, err := json.MarshalIndent(document, " ", " ") + if err != nil { + return fmt.Errorf("error marshalling document: %w", err) + } + + if _, err := writer.Write([]byte(" ")); err != nil { + return err + } + + if _, err := writer.Write(jsonBytes); err != nil { + return err + } + + return nil +} diff --git a/file_monitor.go b/file_monitor.go new file mode 100644 index 0000000..83630c9 --- /dev/null +++ b/file_monitor.go @@ -0,0 +1,203 @@ +package main + +import ( + "context" + "fmt" + "log/slog" + "regexp" + "strings" + + "github.com/hpcloud/tail" +) + +type FileMonitor struct { + config ToolConfig + parser LogParser +} + +type LogParser interface { + Parse(line string, toolName string) LogEntry +} + +func NewFileMonitor(config ToolConfig) *FileMonitor { + var parser LogParser + + if config.Format.Pattern != "" { + pattern, err := regexp.Compile(config.Format.Pattern) + if err != nil { + slog.Error("invalid regex pattern", "tool", config.Name, "error", err) + parser = &DefaultLogParser{} + } else { + parser = &RegexLogParser{ + pattern: pattern, + fields: config.Format.Fields, + } + } + } else { + switch config.Name { + case "nginx-tjm": + parser = &NginxTJMLogParser{} + default: + parser = &DefaultLogParser{} + } + } + + return &FileMonitor{ + config: config, + parser: parser, + } +} + +func (fm *FileMonitor) Start(ctx context.Context, out chan<- LogEntry) error { + t, err := tail.TailFile(fm.config.LogFile, tail.Config{ + Follow: true, + ReOpen: true, + MustExist: false, + Poll: true, + Location: &tail.SeekInfo{Offset: 0, Whence: 2}, + }) + if err != nil { + return fmt.Errorf("tail.TailFile: %w", err) + } + defer t.Stop() + + slog.Debug("Started tailing file", "file", fm.config.LogFile, "tool", fm.config.Name) + + for { + select { + case <-ctx.Done(): + slog.Debug("File monitor stopped", "tool", fm.config.Name) + return nil + case line, ok := <-t.Lines: + if !ok { + return nil + } + + if line.Err != nil { + slog.Error("error reading log file", "tool", fm.config.Name, "error", line.Err) + continue + } + + if strings.TrimSpace(line.Text) == "" { + continue + } + + entry := fm.parser.Parse(line.Text, fm.config.Name) + + select { + case out <- entry: + case <-ctx.Done(): + return nil + default: + slog.Warn("Log-Channel is full, entry dropped", "tool", fm.config.Name) + } + } + } +} + +type DefaultLogParser struct{} + +func (p *DefaultLogParser) Parse(line string, toolName string) LogEntry { + entry := NewLogEntry("log_entry") + entry.Tool = toolName + entry.Message = strings.TrimSpace(line) + entry.Raw = line + return entry +} + +type RegexLogParser struct { + pattern *regexp.Regexp + fields map[string]string +} + +func (p *RegexLogParser) Parse(line string, toolName string) LogEntry { + entry := NewLogEntry("log_entry") + entry.Tool = toolName + entry.Raw = line + + fields := p.parseWithPattern(line) + if fields != nil { + entry.Fields = fields + } else { + entry.Message = strings.TrimSpace(line) + } + + return entry +} + +func (p *RegexLogParser) parseWithPattern(text string) map[string]any { + matches := p.pattern.FindStringSubmatch(text) + if matches == nil { + return nil + } + + fields := make(map[string]any) + subexpNames := p.pattern.SubexpNames() + + for i, match := range matches { + if i == 0 { + continue + } + + if i < len(subexpNames) && subexpNames[i] != "" { + fieldName := subexpNames[i] + + if mappedName, exists := p.fields[fieldName]; exists { + fieldName = mappedName + } + + fields[fieldName] = match + } + } + + return fields +} + +type NginxTJMLogParser struct{} + +func (p *NginxTJMLogParser) Parse(line string, toolName string) LogEntry { + entry := NewLogEntry("log_entry") + entry.Tool = toolName + entry.Raw = line + entry.Fields = p.parseNginxTJM(line) + return entry +} + +func (p *NginxTJMLogParser) parseNginxTJM(text string) map[string]any { + parts := strings.Fields(text) + if len(parts) < 10 { + return map[string]any{ + "raw": text, + } + } + + fields := make(map[string]any) + + if len(parts) > 0 { + timestamp := strings.Trim(parts[0], "[]") + fields["timestamp"] = timestamp + } + + if len(parts) > 2 { + fields["client_ip"] = parts[2] + } + + for i, part := range parts { + if strings.HasPrefix(part, "\"") { + if i+1 < len(parts) { + fields["method"] = strings.Trim(part, "\"") + fields["route"] = parts[i+1] + } + break + } + } + + for _, part := range parts { + if after, ok := strings.CutPrefix(part, "status="); ok { + fields["status"] = after + break + } + } + + return fields +} diff --git a/go.mod b/go.mod new file mode 100644 index 0000000..ceeab46 --- /dev/null +++ b/go.mod @@ -0,0 +1,33 @@ +module tixel_elastic + +go 1.24.1 + +require ( + github.com/elastic/go-elasticsearch/v7 v7.17.10 + github.com/hpcloud/tail v1.0.0 + github.com/shirou/gopsutil v3.21.11+incompatible + github.com/spf13/viper v1.20.1 +) + +require ( + github.com/fsnotify/fsnotify v1.8.0 // indirect + github.com/go-ole/go-ole v1.2.6 // indirect + github.com/go-viper/mapstructure/v2 v2.2.1 // indirect + github.com/pelletier/go-toml/v2 v2.2.3 // indirect + github.com/sagikazarmark/locafero v0.7.0 // indirect + github.com/sourcegraph/conc v0.3.0 // indirect + github.com/spf13/afero v1.12.0 // indirect + github.com/spf13/cast v1.7.1 // indirect + github.com/spf13/pflag v1.0.6 // indirect + github.com/subosito/gotenv v1.6.0 // indirect + github.com/tklauser/go-sysconf v0.3.15 // indirect + github.com/tklauser/numcpus v0.10.0 // indirect + github.com/yusufpapurcu/wmi v1.2.4 // indirect + go.uber.org/atomic v1.9.0 // indirect + go.uber.org/multierr v1.9.0 // indirect + golang.org/x/sys v0.31.0 // indirect + golang.org/x/text v0.21.0 // indirect + gopkg.in/fsnotify.v1 v1.4.7 // indirect + gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7 // indirect + gopkg.in/yaml.v3 v3.0.1 // indirect +) diff --git a/go.sum b/go.sum new file mode 100644 index 0000000..a57c825 --- /dev/null +++ b/go.sum @@ -0,0 +1,71 @@ +github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= +github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/elastic/go-elasticsearch/v7 v7.17.10 h1:TCQ8i4PmIJuBunvBS6bwT2ybzVFxxUhhltAs3Gyu1yo= +github.com/elastic/go-elasticsearch/v7 v7.17.10/go.mod h1:OJ4wdbtDNk5g503kvlHLyErCgQwwzmDtaFC4XyOxXA4= +github.com/frankban/quicktest v1.14.6 h1:7Xjx+VpznH+oBnejlPUj8oUpdxnVs4f8XU8WnHkI4W8= +github.com/frankban/quicktest v1.14.6/go.mod h1:4ptaffx2x8+WTWXmUCuVU6aPUX1/Mz7zb5vbUoiM6w0= +github.com/fsnotify/fsnotify v1.8.0 h1:dAwr6QBTBZIkG8roQaJjGof0pp0EeF+tNV7YBP3F/8M= +github.com/fsnotify/fsnotify v1.8.0/go.mod h1:8jBTzvmWwFyi3Pb8djgCCO5IBqzKJ/Jwo8TRcHyHii0= +github.com/go-ole/go-ole v1.2.6 h1:/Fpf6oFPoeFik9ty7siob0G6Ke8QvQEuVcuChpwXzpY= +github.com/go-ole/go-ole v1.2.6/go.mod h1:pprOEPIfldk/42T2oK7lQ4v4JSDwmV0As9GaiUsvbm0= +github.com/go-viper/mapstructure/v2 v2.2.1 h1:ZAaOCxANMuZx5RCeg0mBdEZk7DZasvvZIxtHqx8aGss= +github.com/go-viper/mapstructure/v2 v2.2.1/go.mod h1:oJDH3BJKyqBA2TXFhDsKDGDTlndYOZ6rGS0BRZIxGhM= +github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI= +github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= +github.com/hpcloud/tail v1.0.0 h1:nfCOvKYfkgYP8hkirhJocXT2+zOD8yUNjXaWfTlyFKI= +github.com/hpcloud/tail v1.0.0/go.mod h1:ab1qPbhIpdTxEkNHXyeSf5vhxWSCs/tWer42PpOxQnU= +github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE= +github.com/kr/pretty v0.3.1/go.mod h1:hoEshYVHaxMs3cyo3Yncou5ZscifuDolrwPKZanG3xk= +github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= +github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE= +github.com/pelletier/go-toml/v2 v2.2.3 h1:YmeHyLY8mFWbdkNWwpr+qIL2bEqT0o95WSdkNHvL12M= +github.com/pelletier/go-toml/v2 v2.2.3/go.mod h1:MfCQTFTvCcUyyvvwm1+G6H/jORL20Xlb6rzQu9GuUkc= +github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= +github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/rogpeppe/go-internal v1.9.0 h1:73kH8U+JUqXU8lRuOHeVHaa/SZPifC7BkcraZVejAe8= +github.com/rogpeppe/go-internal v1.9.0/go.mod h1:WtVeX8xhTBvf0smdhujwtBcq4Qrzq/fJaraNFVN+nFs= +github.com/sagikazarmark/locafero v0.7.0 h1:5MqpDsTGNDhY8sGp0Aowyf0qKsPrhewaLSsFaodPcyo= +github.com/sagikazarmark/locafero v0.7.0/go.mod h1:2za3Cg5rMaTMoG/2Ulr9AwtFaIppKXTRYnozin4aB5k= +github.com/shirou/gopsutil v3.21.11+incompatible h1:+1+c1VGhc88SSonWP6foOcLhvnKlUeu/erjjvaPEYiI= +github.com/shirou/gopsutil v3.21.11+incompatible/go.mod h1:5b4v6he4MtMOwMlS0TUMTu2PcXUg8+E1lC7eC3UO/RA= +github.com/sourcegraph/conc v0.3.0 h1:OQTbbt6P72L20UqAkXXuLOj79LfEanQ+YQFNpLA9ySo= +github.com/sourcegraph/conc v0.3.0/go.mod h1:Sdozi7LEKbFPqYX2/J+iBAM6HpqSLTASQIKqDmF7Mt0= +github.com/spf13/afero v1.12.0 h1:UcOPyRBYczmFn6yvphxkn9ZEOY65cpwGKb5mL36mrqs= +github.com/spf13/afero v1.12.0/go.mod h1:ZTlWwG4/ahT8W7T0WQ5uYmjI9duaLQGy3Q2OAl4sk/4= +github.com/spf13/cast v1.7.1 h1:cuNEagBQEHWN1FnbGEjCXL2szYEXqfJPbP2HNUaca9Y= +github.com/spf13/cast v1.7.1/go.mod h1:ancEpBxwJDODSW/UG4rDrAqiKolqNNh2DX3mk86cAdo= +github.com/spf13/pflag v1.0.6 h1:jFzHGLGAlb3ruxLB8MhbI6A8+AQX/2eW4qeyNZXNp2o= +github.com/spf13/pflag v1.0.6/go.mod h1:McXfInJRrz4CZXVZOBLb0bTZqETkiAhM9Iw0y3An2Bg= +github.com/spf13/viper v1.20.1 h1:ZMi+z/lvLyPSCoNtFCpqjy0S4kPbirhpTMwl8BkW9X4= +github.com/spf13/viper v1.20.1/go.mod h1:P9Mdzt1zoHIG8m2eZQinpiBjo6kCmZSKBClNNqjJvu4= +github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= +github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= +github.com/stretchr/testify v1.10.0 h1:Xv5erBjTwe/5IxqUQTdXv5kgmIvbHo3QQyRwhJsOfJA= +github.com/stretchr/testify v1.10.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= +github.com/subosito/gotenv v1.6.0 h1:9NlTDc1FTs4qu0DDq7AEtTPNw6SVm7uBMsUCUjABIf8= +github.com/subosito/gotenv v1.6.0/go.mod h1:Dk4QP5c2W3ibzajGcXpNraDfq2IrhjMIvMSWPKKo0FU= +github.com/tklauser/go-sysconf v0.3.15 h1:VE89k0criAymJ/Os65CSn1IXaol+1wrsFHEB8Ol49K4= +github.com/tklauser/go-sysconf v0.3.15/go.mod h1:Dmjwr6tYFIseJw7a3dRLJfsHAMXZ3nEnL/aZY+0IuI4= +github.com/tklauser/numcpus v0.10.0 h1:18njr6LDBk1zuna922MgdjQuJFjrdppsZG60sHGfjso= +github.com/tklauser/numcpus v0.10.0/go.mod h1:BiTKazU708GQTYF4mB+cmlpT2Is1gLk7XVuEeem8LsQ= +github.com/yusufpapurcu/wmi v1.2.4 h1:zFUKzehAFReQwLys1b/iSMl+JQGSCSjtVqQn9bBrPo0= +github.com/yusufpapurcu/wmi v1.2.4/go.mod h1:SBZ9tNy3G9/m5Oi98Zks0QjeHVDvuK0qfxQmPyzfmi0= +go.uber.org/atomic v1.9.0 h1:ECmE8Bn/WFTYwEW/bpKD3M8VtR/zQVbavAoalC1PYyE= +go.uber.org/atomic v1.9.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc= +go.uber.org/multierr v1.9.0 h1:7fIwc/ZtS0q++VgcfqFDxSBZVv/Xo49/SYnDFupUwlI= +go.uber.org/multierr v1.9.0/go.mod h1:X2jQV1h+kxSjClGpnseKVIxpmcjrj7MNnI0bnlfKTVQ= +golang.org/x/sys v0.0.0-20190916202348-b4ddaad3f8a3/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.31.0 h1:ioabZlmFYtWhL+TRYpcnNlLwhyxaM9kWTDEmfnprqik= +golang.org/x/sys v0.31.0/go.mod h1:BJP2sWEmIv4KK5OTEluFJCKSidICx8ciO85XgH3Ak8k= +golang.org/x/text v0.21.0 h1:zyQAAkrwaneQ066sspRyJaG9VNi/YJ1NfzcGB3hZ/qo= +golang.org/x/text v0.21.0/go.mod h1:4IBbMaMmOPCJ8SecivzSH54+73PCFmPWxNTLm+vZkEQ= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15 h1:YR8cESwS4TdDjEe65xsg0ogRM/Nc3DYOhEAlW+xobZo= +gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/fsnotify.v1 v1.4.7 h1:xOHLXZwVvI9hhs+cLKq5+I5onOuwQLhQwiu63xxlHs4= +gopkg.in/fsnotify.v1 v1.4.7/go.mod h1:Tz8NjZHkW78fSQdbUxIjBTcgA1z1m8ZHf0WmKUhAMys= +gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7 h1:uRGJdciOHaEIrze2W8Q3AKkepLTh2hOroT7a+7czfdQ= +gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7/go.mod h1:dt/ZhP58zS4L8KSrWDmTeBkI65Dw0HsyUHuEVlX15mw= +gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= +gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/log_processor.go b/log_processor.go new file mode 100644 index 0000000..07248a3 --- /dev/null +++ b/log_processor.go @@ -0,0 +1,83 @@ +package main + +import ( + "context" + "log/slog" + "time" + + "github.com/elastic/go-elasticsearch/v7" +) + +type LogProcessor struct { + sender ElasticsearchSender + baseIndex string + batchSize int +} + +func NewLogProcessor(es *elasticsearch.Client, baseIndex string) *LogProcessor { + return &LogProcessor{ + sender: NewElasticsearchSender(es), + baseIndex: baseIndex, + batchSize: 100, + } +} + +func (lp *LogProcessor) Start(ctx context.Context, logChan <-chan LogEntry) { + batch := make([]LogEntry, 0, lp.batchSize) + ticker := time.NewTicker(5 * time.Second) + defer ticker.Stop() + + slog.Info("Log processor started", "batch_size", lp.batchSize) + + for { + select { + case <-ctx.Done(): + if len(batch) > 0 { + lp.sendBatch(batch) + } + slog.Info("Log processor stopped") + return + + case entry, ok := <-logChan: + if !ok { + if len(batch) > 0 { + lp.sendBatch(batch) + } + return + } + + batch = append(batch, entry) + + if len(batch) >= lp.batchSize { + lp.sendBatch(batch) + batch = batch[:0] + } + + case <-ticker.C: + if len(batch) > 0 { + lp.sendBatch(batch) + batch = batch[:0] + } + } + } +} + +func (lp *LogProcessor) sendBatch(batch []LogEntry) { + if len(batch) == 0 { + return + } + + if err := lp.sender.SendBatch(lp.baseIndex, batch); err != nil { + slog.Error("error sending log batch", "error", err, "batch_size", len(batch)) + return + } + + slog.Debug("Log batch sent successfully", "batch_size", len(batch)) +} + +func (lp *LogProcessor) SetBatchSize(size int) { + if size > 0 { + lp.batchSize = size + slog.Info("Log processor batch size changed", "new_size", size) + } +} diff --git a/main.go b/main.go new file mode 100644 index 0000000..14267cb --- /dev/null +++ b/main.go @@ -0,0 +1,137 @@ +package main + +import ( + "context" + "log/slog" + "os" + "os/signal" + "sync" + "syscall" + "time" +) + +var hostname string + +func init() { + var err error + hostname, err = os.Hostname() + if err != nil { + hostname = "unknown" + } +} + +func main() { + cfg, err := LoadConfig() + if err != nil { + slog.Error("error loading configuration", "error", err) + os.Exit(1) + } + + es, err := NewElasticsearchClient(cfg.Elasticsearch) + if err != nil { + slog.Error("elasticsearch client error", "error", err) + os.Exit(1) + } + + if err := TestElasticsearchConnection(es); err != nil { + slog.Error("elasticsearch connection test failed", "error", err) + os.Exit(1) + } + + slog.Info("TIXEL System Monitor started - Elasticsearch connection successful") + + logChan := make(chan LogEntry, 1000) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + var wg sync.WaitGroup + + for _, service := range cfg.Services { + if !service.Enabled { + slog.Info("Service deactivated, skipping...", "service", service.Name) + continue + } + + wg.Add(1) + go func(s ServiceConfig) { + defer wg.Done() + monitor := NewServiceMonitor(s) + if err := monitor.Start(ctx, logChan); err != nil { + slog.Error("error watching service", "service", s.Name, "error", err) + } + }(service) + + slog.Info("started watching Service-Log", "service", service.Name) + } + + for _, tool := range cfg.Tools { + if !tool.Enabled { + slog.Info("Tool is deactivated, skipping...", "tool", tool.Name) + continue + } + + wg.Add(1) + go func(t ToolConfig) { + defer wg.Done() + monitor := NewFileMonitor(t) + if err := monitor.Start(ctx, logChan); err != nil { + slog.Error("error watching", "tool", t.Name, "error", err) + } + }(tool) + + slog.Info("started watching logs", "tool", tool.Name, "file", tool.LogFile) + } + + if cfg.SystemMetrics.Enabled { + wg.Add(1) + go func() { + defer wg.Done() + collector := NewSystemMetricsCollector(cfg.SystemMetrics, cfg.PollIntervalSeconds) + collector.Start(ctx, es, cfg.Elasticsearch.Index) + }() + slog.Info("Started collecting System-Metrics") + } + + wg.Add(1) + go func() { + defer wg.Done() + processor := NewLogProcessor(es, cfg.Elasticsearch.Index) + processor.Start(ctx, logChan) + }() + + if cfg.WebService.Enabled { + wg.Add(1) + go func() { + defer wg.Done() + webService := NewWebService(cfg, es) + if err := webService.Start(ctx); err != nil { + slog.Error("web service error", "error", err) + } + }() + slog.Info("Web service started", "host", cfg.WebService.Host, "port", cfg.WebService.Port) + } + + sigCh := make(chan os.Signal, 1) + signal.Notify(sigCh, os.Interrupt, syscall.SIGTERM) + + <-sigCh + slog.Info("Shutdown-Signal received, stopping threads...") + + cancel() + close(logChan) + + done := make(chan struct{}) + go func() { + wg.Wait() + close(done) + }() + + select { + case <-done: + slog.Info("All threads closed") + case <-time.After(10 * time.Second): + slog.Info("Shutdown-Timeout reached, force quitting") + } + + slog.Info("Program stopped") +} diff --git a/models.go b/models.go new file mode 100644 index 0000000..914c871 --- /dev/null +++ b/models.go @@ -0,0 +1,68 @@ +package main + +import ( + "time" +) + +type SystemResources struct { + Timestamp time.Time `json:"@timestamp"` + Type string `json:"type"` + Host string `json:"host"` + CPUPercent float64 `json:"cpu_percent,omitempty"` + MemoryUsed uint64 `json:"memory_used,omitempty"` + MemoryTotal uint64 `json:"memory_total,omitempty"` + MemoryPercent float64 `json:"memory_percent,omitempty"` + DiskUsage map[string]DiskUsage `json:"disk_usage,omitempty"` + NetworkStats map[string]NetworkStat `json:"network_stats,omitempty"` + LoadAverage []float64 `json:"load_average,omitempty"` + Uptime uint64 `json:"uptime,omitempty"` +} + +type DiskUsage struct { + Used uint64 `json:"used"` + Total uint64 `json:"total"` + UsedPercent float64 `json:"used_percent"` + Free uint64 `json:"free"` +} + +type NetworkStat struct { + BytesSent uint64 `json:"bytes_sent"` + BytesRecv uint64 `json:"bytes_recv"` + PacketsSent uint64 `json:"packets_sent"` + PacketsRecv uint64 `json:"packets_recv"` +} + +type LogEntry struct { + Timestamp time.Time `json:"@timestamp"` + Type string `json:"type"` + Host string `json:"host"` + Tool string `json:"tool,omitempty"` + Service string `json:"service,omitempty"` + Message string `json:"message,omitempty"` + Fields map[string]any `json:"fields,omitempty"` + Raw string `json:"raw,omitempty"` + Priority string `json:"priority,omitempty"` + Unit string `json:"unit,omitempty"` + PID int `json:"pid,omitempty"` + BootID string `json:"boot_id,omitempty"` + MachineID string `json:"machine_id,omitempty"` +} + +func NewLogEntry(entryType string) LogEntry { + return LogEntry{ + Timestamp: time.Now(), + Type: entryType, + Host: hostname, + Fields: make(map[string]any), + } +} + +func NewSystemResources() SystemResources { + return SystemResources{ + Timestamp: time.Now(), + Type: "system_metrics", + Host: hostname, + DiskUsage: make(map[string]DiskUsage), + NetworkStats: make(map[string]NetworkStat), + } +} diff --git a/service_monitor.go b/service_monitor.go new file mode 100644 index 0000000..2e07dd4 --- /dev/null +++ b/service_monitor.go @@ -0,0 +1,310 @@ +package main + +import ( + "bufio" + "context" + "encoding/json" + "fmt" + "log/slog" + "os/exec" + "strconv" + "strings" + "time" +) + +type ServiceMonitor struct { + config ServiceConfig +} + +func NewServiceMonitor(config ServiceConfig) *ServiceMonitor { + return &ServiceMonitor{ + config: config, + } +} + +func (sm *ServiceMonitor) Start(ctx context.Context, out chan<- LogEntry) error { + args := sm.buildJournalctlArgs() + + slog.Info("starting journalctl", "arguments", args) + + cmd := exec.CommandContext(ctx, args[0], args[1:]...) + + stdout, err := cmd.StdoutPipe() + if err != nil { + return fmt.Errorf("error StdoutPipe: %w", err) + } + + if err := cmd.Start(); err != nil { + return fmt.Errorf("error start command: %w", err) + } + + scanner := bufio.NewScanner(stdout) + + go func() { + <-ctx.Done() + if cmd.Process != nil { + cmd.Process.Kill() + } + }() + + parser := NewJournalEntryParser(sm.config.Name, sm.config.Service) + + for scanner.Scan() { + select { + case <-ctx.Done(): + return nil + default: + } + + line := scanner.Text() + if strings.TrimSpace(line) == "" { + continue + } + + entry, err := parser.Parse(line) + if err != nil { + slog.Error("error parsing journal entry", "service", sm.config.Name, "error", err) + continue + } + + select { + case out <- entry: + case <-ctx.Done(): + return nil + default: + slog.Warn("Service-Log-Channel is full, entry dropped", "service", sm.config.Name) + } + } + + if err := scanner.Err(); err != nil { + return fmt.Errorf("scanner error: %w", err) + } + + return cmd.Wait() +} + +func (sm *ServiceMonitor) buildJournalctlArgs() []string { + args := []string{ + "sudo", + "journalctl", + "-f", + "-o", "json", + "-u", sm.config.Service, + } + + if sm.config.SinceTime != "" { + args = append(args, "--since", sm.config.SinceTime) + } + + if sm.config.Priority != "" { + args = append(args, "-p", sm.config.Priority) + } + + return args +} + +type JournalEntryParser struct { + serviceName string + unitName string +} + +func NewJournalEntryParser(serviceName, unitName string) *JournalEntryParser { + return &JournalEntryParser{ + serviceName: serviceName, + unitName: unitName, + } +} + +func (jep *JournalEntryParser) Parse(jsonLine string) (LogEntry, error) { + var journalData map[string]any + if err := json.Unmarshal([]byte(jsonLine), &journalData); err != nil { + return LogEntry{}, fmt.Errorf("JSON unmarshal error: %w", err) + } + + entry := NewLogEntry("service_log") + entry.Service = jep.serviceName + entry.Unit = jep.unitName + + if tsStr, ok := journalData["__REALTIME_TIMESTAMP"].(string); ok { + if tsInt, err := strconv.ParseInt(tsStr, 10, 64); err == nil { + entry.Timestamp = time.Unix(0, tsInt*1000) + } + } + if entry.Timestamp.IsZero() { + entry.Timestamp = time.Now() + } + + if msg, ok := journalData["MESSAGE"].(string); ok { + entry.Message = msg + } + + if priority, ok := journalData["PRIORITY"].(string); ok { + entry.Priority = priority + entry.Fields["priority_name"] = jep.getPriorityName(priority) + } + + if pidStr, ok := journalData["_PID"].(string); ok { + if pid, err := strconv.Atoi(pidStr); err == nil { + entry.PID = pid + } + } + + jep.extractSystemdFields(journalData, &entry) + + if bootID, ok := journalData["_BOOT_ID"].(string); ok { + entry.BootID = bootID + } + if machineID, ok := journalData["_MACHINE_ID"].(string); ok { + entry.MachineID = machineID + } + + entry.Raw = jsonLine + + entry = jep.parseServiceSpecific(entry) + + return entry, nil +} + +func (jep *JournalEntryParser) getPriorityName(priority string) string { + priorityNames := map[string]string{ + "0": "emergency", + "1": "alert", + "2": "critical", + "3": "error", + "4": "warning", + "5": "notice", + "6": "info", + "7": "debug", + } + + if name, exists := priorityNames[priority]; exists { + return name + } + return "unknown" +} + +func (jep *JournalEntryParser) extractSystemdFields(journalData map[string]any, entry *LogEntry) { + systemdFields := []string{ + "_SYSTEMD_UNIT", "_SYSTEMD_USER_UNIT", "_SYSTEMD_SLICE", + "_BOOT_ID", "_MACHINE_ID", "_HOSTNAME", "_TRANSPORT", + "_CAP_EFFECTIVE", "_SELINUX_CONTEXT", "_AUDIT_SESSION", + "_AUDIT_LOGINUID", "_GID", "_UID", "_COMM", "_EXE", + "_CMDLINE", "_SYSTEMD_CGROUP", "_SYSTEMD_SESSION", + "_SYSTEMD_OWNER_UID", "_SOURCE_REALTIME_TIMESTAMP", + } + + for _, field := range systemdFields { + if value, ok := journalData[field]; ok { + esFieldName := strings.ToLower(strings.TrimPrefix(field, "_")) + entry.Fields[esFieldName] = value + } + } +} + +func (jep *JournalEntryParser) parseServiceSpecific(entry LogEntry) LogEntry { + switch jep.serviceName { + case "tixstream": + return parseTixstreamService(entry) + case "transfer-job-manager": + return parseTJMService(entry) + case "nginx": + return parseNginxService(entry) + default: + return entry + } +} + +func parseTixstreamService(entry LogEntry) LogEntry { + newEntry := entry + msg := strings.ReplaceAll(entry.Message, " ", " ") + parts := strings.Fields(msg) + if len(parts) < 5 { + return newEntry + } + + logLevel := parts[0] + timestampDate := parts[1] + timestampTime := parts[2] + transferID := parts[3] + info := parts[4:] + + if newEntry.Fields == nil { + newEntry.Fields = make(map[string]any) + } + newEntry.Fields["log_level"] = logLevel + newEntry.Fields["message_date"] = timestampDate + newEntry.Fields["message_time"] = timestampTime + newEntry.Fields["transfer_id"] = transferID + newEntry.Fields["log_message"] = strings.Join(info, " ") + + if info != nil { + var transferDirection string + var transferInfo []string + var queueStats []string + var logType string + switch info[0] { + case "in:": + logType = "direction_info" + transferDirection = "incoming" + transferInfo = info[1:] + case "out:": + logType = "direction_info" + transferDirection = "outgoing" + transferInfo = info[1:] + case "queue-stats:": + logType = "queue_stats" + queueStats = info[1:] + case "transfer:": + logType = "transfer_info" + transferInfo = info[1:] + default: + logType = "log_message" + transferDirection = "" + transferInfo = info + } + if logType != "" { + newEntry.Fields["log_type"] = logType + } + if transferDirection != "" { + newEntry.Fields["transfer_direction"] = transferDirection + } + if transferInfo != nil { + newEntry.Fields["transfer_info"] = strings.Join(transferInfo, ";") + } + if queueStats != nil { + newEntry.Fields["queue_stats"] = strings.Join(queueStats, ";") + } + } + + return newEntry +} + +func parseTJMService(entry LogEntry) LogEntry { + newEntry := entry + msg := strings.ReplaceAll(entry.Message, " ", " ") + msg = strings.ReplaceAll(msg, "---", "") + msg = strings.ReplaceAll(msg, " ", " ") + parts := strings.Fields(msg) + if len(parts) < 4 { + return newEntry + } + + timestampDate := parts[0] + timestampTime := parts[1] + logLevel := parts[2] + info := parts[3:] + + if newEntry.Fields == nil { + newEntry.Fields = make(map[string]any) + } + newEntry.Fields["log_level"] = logLevel + newEntry.Fields["message_date"] = timestampDate + newEntry.Fields["message_time"] = timestampTime + newEntry.Fields["message"] = strings.Join(info, " ") + + return newEntry +} + +func parseNginxService(entry LogEntry) LogEntry { + return entry +} diff --git a/system_metrics.go b/system_metrics.go new file mode 100644 index 0000000..54cbec0 --- /dev/null +++ b/system_metrics.go @@ -0,0 +1,154 @@ +package main + +import ( + "context" + "fmt" + "log/slog" + "slices" + "time" + + "github.com/elastic/go-elasticsearch/v7" + "github.com/shirou/gopsutil/cpu" + "github.com/shirou/gopsutil/disk" + "github.com/shirou/gopsutil/host" + "github.com/shirou/gopsutil/mem" + "github.com/shirou/gopsutil/net" +) + +type SystemMetricsCollector struct { + config SystemMetrics + pollInterval int +} + +func NewSystemMetricsCollector(config SystemMetrics, pollInterval int) *SystemMetricsCollector { + return &SystemMetricsCollector{ + config: config, + pollInterval: pollInterval, + } +} + +func (smc *SystemMetricsCollector) Start(ctx context.Context, es *elasticsearch.Client, baseIndex string) { + ticker := time.NewTicker(time.Duration(smc.pollInterval) * time.Second) + defer ticker.Stop() + + sender := NewElasticsearchSender(es) + + for { + select { + case <-ctx.Done(): + slog.Info("System metrics collector stopped") + return + case <-ticker.C: + metrics, err := smc.collectMetrics() + if err != nil { + slog.Error("error collecting system metrics", "error", err) + continue + } + + if err := sender.SendSystemMetrics(baseIndex, metrics); err != nil { + slog.Error("error sending system metrics", "error", err) + } + } + } +} + +func (smc *SystemMetricsCollector) collectMetrics() (SystemResources, error) { + result := NewSystemResources() + + var err error + + if smc.config.CollectCPU { + if err = smc.collectCPUMetrics(&result); err != nil { + return result, fmt.Errorf("CPU metrics: %w", err) + } + } + + if smc.config.CollectMemory { + if err = smc.collectMemoryMetrics(&result); err != nil { + return result, fmt.Errorf("memory metrics: %w", err) + } + } + + if smc.config.CollectDisk { + if err = smc.collectDiskMetrics(&result); err != nil { + return result, fmt.Errorf("disk metrics: %w", err) + } + } + + if smc.config.CollectNetwork { + if err = smc.collectNetworkMetrics(&result); err != nil { + return result, fmt.Errorf("network metrics: %w", err) + } + } + + return result, nil +} + +func (smc *SystemMetricsCollector) collectCPUMetrics(result *SystemResources) error { + cpuPercents, err := cpu.Percent(time.Second, false) + if err != nil { + return err + } + + if len(cpuPercents) > 0 { + result.CPUPercent = cpuPercents[0] + } + + if hostStat, err := host.Info(); err == nil { + result.Uptime = hostStat.Uptime + } + + return nil +} + +func (smc *SystemMetricsCollector) collectMemoryMetrics(result *SystemResources) error { + vmStat, err := mem.VirtualMemory() + if err != nil { + return err + } + + result.MemoryUsed = vmStat.Used + result.MemoryTotal = vmStat.Total + result.MemoryPercent = vmStat.UsedPercent + + return nil +} + +func (smc *SystemMetricsCollector) collectDiskMetrics(result *SystemResources) error { + for _, path := range smc.config.DiskPaths { + diskStat, err := disk.Usage(path) + if err != nil { + slog.Error("error reading disk stats", "path", path, "error", err) + continue + } + + result.DiskUsage[path] = DiskUsage{ + Used: diskStat.Used, + Total: diskStat.Total, + UsedPercent: diskStat.UsedPercent, + Free: diskStat.Free, + } + } + + return nil +} + +func (smc *SystemMetricsCollector) collectNetworkMetrics(result *SystemResources) error { + netStats, err := net.IOCounters(true) + if err != nil { + return err + } + + for _, stat := range netStats { + if len(smc.config.NetworkInterfaces) == 0 || slices.Contains(smc.config.NetworkInterfaces, stat.Name) { + result.NetworkStats[stat.Name] = NetworkStat{ + BytesSent: stat.BytesSent, + BytesRecv: stat.BytesRecv, + PacketsSent: stat.PacketsSent, + PacketsRecv: stat.PacketsRecv, + } + } + } + + return nil +} diff --git a/web_service.go b/web_service.go new file mode 100644 index 0000000..9d49ff5 --- /dev/null +++ b/web_service.go @@ -0,0 +1,202 @@ +package main + +import ( + "context" + "encoding/json" + "fmt" + "log/slog" + "net/http" + "strconv" + "strings" + "time" + + "github.com/elastic/go-elasticsearch/v7" +) + +type WebService struct { + server *http.Server + esClient *elasticsearch.Client + config *Config +} + +func NewWebService(config *Config, esClient *elasticsearch.Client) *WebService { + mux := http.NewServeMux() + + ws := &WebService{ + esClient: esClient, + config: config, + } + + mux.HandleFunc("/export", ws.handleExport) + mux.HandleFunc("/health", ws.handleHealth) + mux.HandleFunc("/indices", ws.handleIndices) + + addr := fmt.Sprintf("%s:%d", config.WebService.Host, config.WebService.Port) + ws.server = &http.Server{ + Addr: addr, + Handler: mux, + ReadTimeout: 30 * time.Second, + WriteTimeout: 300 * time.Second, + IdleTimeout: 60 * time.Second, + } + + return ws +} + +func (ws *WebService) Start(ctx context.Context) error { + go func() { + <-ctx.Done() + shutdownCtx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + + if err := ws.server.Shutdown(shutdownCtx); err != nil { + slog.Error("web service shutdown error", "error", err) + } + }() + + slog.Info("Starting web service", "address", ws.server.Addr) + + if err := ws.server.ListenAndServe(); err != http.ErrServerClosed { + return fmt.Errorf("web service error: %w", err) + } + + return nil +} + +func (ws *WebService) handleExport(w http.ResponseWriter, r *http.Request) { + if r.Method != http.MethodGet { + http.Error(w, "Method not allowed", http.StatusMethodNotAllowed) + return + } + + indices := ws.parseIndicesParam(r) + if len(indices) == 0 { + http.Error(w, "No indices specified. Use ?indices=index1,index2", http.StatusBadRequest) + return + } + + size := ws.parseSizeParam(r) + + slog.Info("Export request received", "indices", indices, "size", size) + + w.Header().Set("Content-Type", "application/json") + w.Header().Set("Content-Disposition", "attachment; filename=elasticsearch_export.json") + + exporter := NewElasticsearchExporter(ws.esClient) + if err := exporter.ExportToStream(r.Context(), indices, size, w); err != nil { + slog.Error("export error", "error", err) + http.Error(w, fmt.Sprintf("Export error: %v", err), http.StatusInternalServerError) + return + } + + slog.Info("Export completed successfully", "indices", indices) +} + +func (ws *WebService) handleHealth(w http.ResponseWriter, r *http.Request) { + if r.Method != http.MethodGet { + http.Error(w, "Method not allowed", http.StatusMethodNotAllowed) + return + } + + ctx, cancel := context.WithTimeout(r.Context(), 5*time.Second) + defer cancel() + + res, err := ws.esClient.Info(ws.esClient.Info.WithContext(ctx)) + if err != nil { + w.WriteHeader(http.StatusServiceUnavailable) + json.NewEncoder(w).Encode(map[string]any{ + "status": "unhealthy", + "error": err.Error(), + }) + return + } + res.Body.Close() + + if res.IsError() { + w.WriteHeader(http.StatusServiceUnavailable) + json.NewEncoder(w).Encode(map[string]any{ + "status": "unhealthy", + "error": res.String(), + }) + return + } + + w.Header().Set("Content-Type", "application/json") + json.NewEncoder(w).Encode(map[string]any{ + "status": "healthy", + "timestamp": time.Now(), + }) +} + +func (ws *WebService) handleIndices(w http.ResponseWriter, r *http.Request) { + if r.Method != http.MethodGet { + http.Error(w, "Method not allowed", http.StatusMethodNotAllowed) + return + } + + ctx, cancel := context.WithTimeout(r.Context(), 10*time.Second) + defer cancel() + + res, err := ws.esClient.Cat.Indices( + ws.esClient.Cat.Indices.WithContext(ctx), + ws.esClient.Cat.Indices.WithFormat("json"), + ) + if err != nil { + http.Error(w, fmt.Sprintf("Error fetching indices: %v", err), http.StatusInternalServerError) + return + } + defer res.Body.Close() + + if res.IsError() { + http.Error(w, fmt.Sprintf("Elasticsearch error: %s", res.String()), http.StatusInternalServerError) + return + } + + var indices []map[string]any + if err := json.NewDecoder(res.Body).Decode(&indices); err != nil { + http.Error(w, fmt.Sprintf("Error decoding response: %v", err), http.StatusInternalServerError) + return + } + + w.Header().Set("Content-Type", "application/json") + json.NewEncoder(w).Encode(map[string]any{ + "indices": indices, + "count": len(indices), + }) +} + +func (ws *WebService) parseIndicesParam(r *http.Request) []string { + indicesParam := r.URL.Query().Get("indices") + if indicesParam == "" { + return nil + } + + indices := strings.Split(indicesParam, ",") + var result []string + for _, index := range indices { + index = strings.TrimSpace(index) + if index != "" { + result = append(result, index) + } + } + + return result +} + +func (ws *WebService) parseSizeParam(r *http.Request) int { + sizeParam := r.URL.Query().Get("size") + if sizeParam == "" { + return 1000 + } + + size, err := strconv.Atoi(sizeParam) + if err != nil || size <= 0 { + return 1000 + } + + if size > 10000 { + size = 10000 + } + + return size +}