diff --git a/build.sh b/build.sh deleted file mode 100755 index 3cebecf..0000000 --- a/build.sh +++ /dev/null @@ -1,23 +0,0 @@ -#!/bin/bash -set -e -PACKAGE_DIR="./packages" -PACKAGE_NAME="./tixel-watch" - -if [ -d "${PACKAGE_DIR}" ]; then - rm -rf "${PACKAGE_DIR:?}/"* -else - mkdir -p "${PACKAGE_DIR}" -fi - -if [ -d "${PACKAGE_NAME}" ]; then - rm -rf "${PACKAGE_NAME:?}" -fi - -CGO_ENABLED=0 go build -ldflags="-s -w" -o "$PACKAGE_DIR"/tixel-watch -buildvcs . - -cp -r ./tixel-watch.service $PACKAGE_DIR/ -cp -r ./configs/ $PACKAGE_DIR/ -cp -r ./install.sh $PACKAGE_DIR/ -mv $PACKAGE_DIR tixel-watch - -tar -czvf tixel-watch.tar.gz ./tixel-watch diff --git a/config.go b/config.go index 065f3f0..8c4ade2 100644 --- a/config.go +++ b/config.go @@ -2,8 +2,9 @@ package main import ( "fmt" - "log" "log/slog" + "os" + "regexp" "time" "github.com/spf13/viper" @@ -57,8 +58,9 @@ type ElasticsearchConfig struct { } type LocalStorage struct { - Enable bool `mapstructure:"enabled"` - DBPath string `mapstructure:"db_path"` + Enable bool `mapstructure:"enabled"` + DBPath string `mapstructure:"db_path"` + RotationConfig StorageRotationConfig `mapstructure:"rotation"` } type SystemMetrics struct { @@ -95,12 +97,88 @@ type Config struct { Level string `mapstructure:"level"` FilePath string `mapstructure:"file_path"` } `mapstructure:"logging"` + PatternsFile string `mapstructure:"patterns_file"` + Drain3 Drain3Config `mapstructure:"drain3"` +} + +type Drain3Config struct { + Enabled bool `mapstructure:"enabled"` + StateDir string `mapstructure:"state_dir"` + Depth int `mapstructure:"depth"` + SimThreshold float64 `mapstructure:"sim_th"` + MaxChildren int `mapstructure:"max_children"` + SaveIntervalSeconds int `mapstructure:"save_interval"` +} + +type StorageRotationConfig struct { + // MaxSizeBytes is the maximum size of the database in bytes (0 = deactivated) + MaxSizeBytes int64 `mapstructure:"max_size_bytes"` + // MaxAgeHours is the maximum age of the database (0 = deaactivated) + MaxAgeHours time.Duration `mapstructure:"max_age_hours"` + // MaxFiles is the maximum count of old files, to keep + MaxFiles int `mapstructure:"max_files"` + // CheckIntervalMinutes is the intervall for checking rotation conditions + CheckIntervalMinutes time.Duration `mapstructure:"check_interval_minutes"` + // ArchiveDir is the dir to store archived files (empty = same dir as db) + ArchiveDir string `mapstructure:"archive_dir"` +} + +func (src StorageRotationConfig) GetMaxAge() time.Duration { + if src.MaxAgeHours <= 0 { + return 0 + } + return time.Duration(src.MaxAgeHours) * time.Hour +} + +func (src StorageRotationConfig) GetCheckInterval() time.Duration { + if src.CheckIntervalMinutes <= 0 { + return 5 * time.Minute + } + return time.Duration(src.CheckIntervalMinutes) * time.Minute +} + +func setConfigDefaults() { + viper.SetDefault("poll_interval_seconds", 30) + viper.SetDefault("elasticsearch.timeout", 30) + viper.SetDefault("system_metrics.enabled", true) + viper.SetDefault("system_metrics.collect_cpu", true) + viper.SetDefault("system_metrics.collect_memory", true) + viper.SetDefault("system_metrics.collect_disk", true) + viper.SetDefault("system_metrics.collect_network", false) + viper.SetDefault("system_metrics.disk_paths", []string{"/"}) + viper.SetDefault("web_service.enabled", false) + viper.SetDefault("web_service.port", 8080) + viper.SetDefault("web_service.host", "localhost") + viper.SetDefault("logging.level", "info") + viper.SetDefault("export.enabled", true) + viper.SetDefault("export.batch_size", 100) + viper.SetDefault("export.export_interval", "30s") + viper.SetDefault("export.retry_attempts", 3) + viper.SetDefault("export.retry_backoff", "5s") + viper.SetDefault("export.health_check_interval", "60s") + viper.SetDefault("localstorage.enabled", true) + viper.SetDefault("localstorage.db_path", "./watch.db") + viper.SetDefault("localstorage.rotation.max_size_bytes", int64(100*1024*1024)) + viper.SetDefault("localstorage.rotation.max_age_hours", 24) + viper.SetDefault("localstorage.rotation.max_files", 7) + viper.SetDefault("localstorage.rotation.check_interval_minutes", 5) + viper.SetDefault("localstorage.rotation.archive_dir", "") + viper.SetDefault("patterns_file", "./configs/patterns.yaml") + viper.SetDefault("drain3.enabled", true) + viper.SetDefault("drain3.state_dir", "./drain3_states") + viper.SetDefault("drain3.depth", 4) + viper.SetDefault("drain3.sim_th", 0.4) + viper.SetDefault("drain3.max_children", 100) } func LoadConfig() (*Config, error) { + home, err := os.UserConfigDir() + if err != nil { + return nil, fmt.Errorf("unable to get user config dir: %w", err) + } viper.SetConfigName("config") viper.AddConfigPath(".") - viper.AddConfigPath("/opt/tixel/tixel-watch/") + viper.AddConfigPath(home) viper.SetConfigType("yaml") setConfigDefaults() @@ -121,92 +199,7 @@ func LoadConfig() (*Config, error) { return &cfg, nil } -func setConfigDefaults() { - viper.SetDefault("poll_interval_seconds", 30) - viper.SetDefault("elasticsearch.timeout", 30) - viper.SetDefault("system_metrics.enabled", true) - viper.SetDefault("system_metrics.collect_cpu", true) - viper.SetDefault("system_metrics.collect_memory", true) - viper.SetDefault("system_metrics.collect_disk", true) - viper.SetDefault("system_metrics.collect_network", false) - viper.SetDefault("system_metrics.disk_paths", []string{"/"}) - viper.SetDefault("web_service.enabled", false) - viper.SetDefault("web_service.port", 8080) - viper.SetDefault("web_service.host", "localhost") - viper.SetDefault("logging.level", "info") -} - -func setConfigDefaultsV2() { - viper.SetDefault("poll_interval_seconds", 30) - viper.SetDefault("elasticsearch.timeout", 30) - viper.SetDefault("system_metrics.enabled", true) - viper.SetDefault("system_metrics.collect_cpu", true) - viper.SetDefault("system_metrics.collect_memory", true) - viper.SetDefault("system_metrics.collect_disk", true) - viper.SetDefault("system_metrics.collect_network", false) - viper.SetDefault("system_metrics.disk_paths", []string{"/"}) - viper.SetDefault("web_service.enabled", false) - viper.SetDefault("web_service.port", 8080) - viper.SetDefault("web_service.host", "localhost") - viper.SetDefault("logging.level", "info") - viper.SetDefault("export.enabled", true) - viper.SetDefault("export.batch_size", 100) - viper.SetDefault("export.export_interval", "30s") - viper.SetDefault("export.retry_attempts", 3) - viper.SetDefault("export.retry_backoff", "5s") - viper.SetDefault("export.health_check_interval", "60s") - viper.SetDefault("localstorage.enabled", true) - viper.SetDefault("localstorage.db_path", "./tixel_watch.db") -} - func validateConfig(cfg *Config) error { - if cfg.Elasticsearch.URL == "" { - return fmt.Errorf("elasticsearch.url is required") - } - - if cfg.Elasticsearch.Index == "" { - return fmt.Errorf("elasticsearch.index is required") - } - - if cfg.PollIntervalSeconds <= 0 { - log.Printf("Warning: poll_interval_seconds is %d, setting to 30", cfg.PollIntervalSeconds) - cfg.PollIntervalSeconds = 30 - } - - for i := range cfg.Tools { - if cfg.Tools[i].BufferSize <= 0 { - cfg.Tools[i].BufferSize = 100 - } - } - - return nil -} - -func LoadConfigV2() (*Config, error) { - viper.SetConfigName("config") - viper.AddConfigPath(".") - viper.AddConfigPath("/opt/tixel/tixel-watch/") - viper.SetConfigType("yaml") - - setConfigDefaultsV2() - - if err := viper.ReadInConfig(); err != nil { - return nil, fmt.Errorf("error reading config: %w", err) - } - - var cfg Config - if err := viper.Unmarshal(&cfg); err != nil { - return nil, fmt.Errorf("error parsing config: %w", err) - } - - if err := validateConfigV2(&cfg); err != nil { - return nil, fmt.Errorf("config validation failed: %w", err) - } - - return &cfg, nil -} - -func validateConfigV2(cfg *Config) error { if !cfg.LocalStorage.Enable { return fmt.Errorf("local storage must be enabled in the new architecture") } @@ -247,10 +240,36 @@ func validateConfigV2(cfg *Config) error { } } - for i := range cfg.Tools { - if cfg.Tools[i].BufferSize <= 0 { - cfg.Tools[i].BufferSize = 100 + for _, tool := range cfg.Tools { + if tool.BufferSize <= 0 { + tool.BufferSize = 100 } + + if tool.Format.Pattern != "" { + if _, err := regexp.Compile(tool.Format.Pattern); err != nil { + return fmt.Errorf("invalid regex for tool '%s': %w", tool.Name, err) + } + } + } + + if cfg.LocalStorage.RotationConfig.MaxSizeBytes < 0 { + slog.Warn("Invalid rotation max_size_bytes, setting to 100MB", "value", cfg.LocalStorage.RotationConfig.MaxSizeBytes) + cfg.LocalStorage.RotationConfig.MaxSizeBytes = 100 * 1024 * 1024 + } + + if cfg.LocalStorage.RotationConfig.MaxAgeHours < 0 { + slog.Warn("Invalid rotation max_age_hours, setting to 24", "value", cfg.LocalStorage.RotationConfig.MaxAgeHours) + cfg.LocalStorage.RotationConfig.MaxAgeHours = 24 + } + + if cfg.LocalStorage.RotationConfig.MaxFiles < 0 { + slog.Warn("Invalid rotation max_files, setting to 7", "value", cfg.LocalStorage.RotationConfig.MaxFiles) + cfg.LocalStorage.RotationConfig.MaxFiles = 7 + } + + if cfg.LocalStorage.RotationConfig.CheckIntervalMinutes < 1 { + slog.Warn("Invalid rotation check_interval_minutes, setting to 5", "value", cfg.LocalStorage.RotationConfig.CheckIntervalMinutes) + cfg.LocalStorage.RotationConfig.CheckIntervalMinutes = 5 } return nil diff --git a/configs/elasticsearch.yml b/configs/elasticsearch.yml deleted file mode 100644 index fdd1f9c..0000000 --- a/configs/elasticsearch.yml +++ /dev/null @@ -1,119 +0,0 @@ -# ======================== Elasticsearch Configuration ========================= -# -# NOTE: Elasticsearch comes with reasonable defaults for most settings. -# Before you set out to tweak and tune the configuration, make sure you -# understand what are you trying to accomplish and the consequences. -# -# The primary way of configuring a node is via this file. This template lists -# the most important settings you may want to configure for a production cluster. -# -# Please consult the documentation for further information on configuration options: -# https://www.elastic.co/guide/en/elasticsearch/reference/index.html -# -# ---------------------------------- Cluster ----------------------------------- -# -# Use a descriptive name for your cluster: -# -cluster.name: tixel-elastic -# -# ------------------------------------ Node ------------------------------------ -# -# Use a descriptive name for the node: -# -#node.name: node-1 -# -# Add custom attributes to the node: -# -#node.attr.rack: r1 -# -# ----------------------------------- Paths ------------------------------------ -# -# Path to directory where to store the data (separate multiple locations by comma): -# -path.data: /var/lib/elasticsearch -# -# Path to log files: -# -path.logs: /var/log/elasticsearch -# -# ----------------------------------- Memory ----------------------------------- -# -# Lock the memory on startup: -# -#bootstrap.memory_lock: true -# -# Make sure that the heap size is set to about half the memory available -# on the system and that the owner of the process is allowed to use this -# limit. -# -# Elasticsearch performs poorly when the system is swapping the memory. -# -# ---------------------------------- Network ----------------------------------- -# -# By default Elasticsearch is only accessible on localhost. Set a different -# address here to expose this node on the network: -# -network.host: 0.0.0.0 -# -# By default Elasticsearch listens for HTTP traffic on the first free port it -# finds starting at 9200. Set a specific HTTP port here: -# -#http.port: 9200 -# -# For more information, consult the network module documentation. -# -# --------------------------------- Discovery ---------------------------------- -# -# Pass an initial list of hosts to perform discovery when this node is started: -# The default list of hosts is ["127.0.0.1", "[::1]"] -# -#discovery.seed_hosts: ["host1", "host2"] -# -# Bootstrap the cluster using an initial set of master-eligible nodes: -# -#cluster.initial_master_nodes: ["node-1", "node-2"] -# -# For more information, consult the discovery and cluster formation module documentation. -# -# ---------------------------------- Various ----------------------------------- -# -# Allow wildcard deletion of indices: -# -#action.destructive_requires_name: false - -#----------------------- BEGIN SECURITY AUTO CONFIGURATION ----------------------- -# -# The following settings, TLS certificates, and keys have been automatically -# generated to configure Elasticsearch security features on 26-08-2025 14:51:23 -# -# -------------------------------------------------------------------------------- - -# Enable security features -xpack.security.enabled: false - -xpack.security.enrollment.enabled: false - -# Enable encryption for HTTP API client connections, such as Kibana, Logstash, and Agents -xpack.security.http.ssl: - enabled: false - keystore.path: certs/http.p12 - -# Enable encryption and mutual authentication between cluster nodes -xpack.security.transport.ssl: - enabled: false - verification_mode: certificate - keystore.path: certs/transport.p12 - truststore.path: certs/transport.p12 -# Create a new cluster with the current node only -# Additional nodes can still join the cluster later -cluster.initial_master_nodes: ["frankfurt.tixeltec.de"] - -# Allow HTTP API connections from anywhere -# Connections are encrypted and require user authentication -http.host: 0.0.0.0 - -# Allow other nodes to join the cluster from anywhere -# Connections are encrypted and mutually authenticated -transport.host: 0.0.0.0 - -#----------------------- END SECURITY AUTO CONFIGURATION ------------------------- diff --git a/configs/config.yaml b/configs/example-config.yaml similarity index 60% rename from configs/config.yaml rename to configs/example-config.yaml index 6190a5f..706b863 100644 --- a/configs/config.yaml +++ b/configs/example-config.yaml @@ -1,35 +1,74 @@ + +export: + enabled: true + batch_size: 100 + export_interval: "30s" + retry_attempts: 5 + retry_backoff: "10s" + health_check_interval: "60s" + +localstorage: + enabled: true + db_path: "./watch.db" + rotation: + max_sizes_bytes: 100 * 1024 * 1024 + max_age_hours: 24 + max_files: 3 + check_interval_minuntes: 5 + archive_dir: "" + elasticsearch: - url: "http://localhost:9200" - index: "tixel" - username: "elastic" - password: "79QQ4JGTa3R_nkqA=MxW" + enabled: true + url: "http://10.0.0.99:9200" + index: "watch" + username: "your-configured-user" + password: "your-super-secret-password" + api_key: "your-api-key" timeout: 30 web_service: enabled: true - host: "localhost" - port: 8080 + host: "0.0.0.0" + port: 9090 system_metrics: enabled: true collect_cpu: true collect_memory: true collect_disk: true - collect_network: false + collect_network: true disk_paths: - "/" - "/var" - "/home" network_interfaces: - - "eth0" - - "wlan0" + - "ens6" + collect_network_connections: true + collect_load_average: true + collect_tcp_stats: true + collect_filehandles: true + collect_disk_io: true + collect_network_latency: true + collect_bandwidth_usage: true + transfer_ports: 60003 + latency_test_hosts: "www.google.de" poll_interval_seconds: 30 +patterns_file: "./configs/patterns.yaml" logging: level: "info" file_path: "/var/log/system-monitor.log" +drain3: + enabled: true + state_dir: "./drain3_states" + depth: 4 + sim_th: 0.4 + max_children: 100 + max_clusters: 1000 + save_interval: 60 + services: - name: "nginx" service: "nginx.service" @@ -37,18 +76,6 @@ services: since_time: "" priority: "info" - - name: "tixstream" - service: "tixstream.service" - enabled: true - since_time: "" - priority: "debug" - - - name: "transfer-job-manager" - service: "transfer-job-manager.service" - enabled: true - since_time: "" - priority: "debug" - tools: - name: "nginx-access" log_file: "/var/log/nginx/access.log" @@ -82,9 +109,3 @@ tools: tid: "thread_id" message: "error_message" - - name: "nginx-tjm" - log_file: "/var/log/nginx/access_tjm.log" - enabled: true - buffer_size: 100 - format: - name: "custom" diff --git a/configs/example-patterns.yaml b/configs/example-patterns.yaml new file mode 100644 index 0000000..56ce629 --- /dev/null +++ b/configs/example-patterns.yaml @@ -0,0 +1,36 @@ +patterns: + common: + extractors: + - name: "syslog_header" + regex: '^(\w{3} \d{2} \d{2}:\d{2}:\d{2}) (?P[^\s]+) (?P[^:]+):\s*(?P.*)$' + fields: + syslog_timestamp: "time:Jan 02 15:04:05" + hostname: "string" + process_info: "string" + message_rest: "string" + + - name: "iso8601_timestamp" + regex: '(?P\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}(?:\.\d+)?Z?)' + fields: + timestamp: "time:2006-01-02T15:04:05.000000Z" + + nginx: + extractors: + - name: "access_log" + regex: '^(?P\S+)\s+\S+\s+(?P\S+)\s+\[(?P[^\]]+)\]\s+"(?P[^"]+)"\s+(?P\d+)\s+(?P\d+|-)' + fields: + client_ip: "string" + remote_user: "string" + timestamp_nginx: "string" + request: "string" + status_code: "int" + bytes_sent: "int" + + my-app: + extractors: + - name: "app_log" + regex: '^\[(?P\w+)\] id=(?P\d+) duration=(?P\d+)ms' + fields: + level: "string" + request_id: "int" + duration_ms: "int" diff --git a/elasticsearch.go b/elasticsearch.go index 3b3537d..b897cb8 100644 --- a/elasticsearch.go +++ b/elasticsearch.go @@ -7,7 +7,7 @@ import ( "log/slog" "strings" "time" - "tixel_watch/models" + "watch-tool/models" "github.com/elastic/go-elasticsearch/v8" ) @@ -70,8 +70,7 @@ func (esc *ElasticsearchClient) SendBatch(baseIndex string, entries []models.Log var body strings.Builder for _, entry := range entries { - // indexName := determineIndexName(baseIndex, entry) - indexName := "tixel" + indexName := "watch" indexLine := fmt.Sprintf(`{"index":{"_index":"%s"}}`, indexName) body.WriteString(indexLine) @@ -118,7 +117,7 @@ func (esc *ElasticsearchClient) SendSystemMetrics(baseIndex string, metrics mode return fmt.Errorf("JSON marshalling error: %w", err) } - systemIndex := "tixel" + systemIndex := "watch" ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) defer cancel() @@ -146,14 +145,3 @@ func (esc *ElasticsearchClient) SendSystemMetrics(baseIndex string, metrics mode return nil } - -// func determineIndexName(baseIndex string, entry LogEntry) string { -// switch entry.Type { -// case "system_metrics": -// return fmt.Sprintf("%s-system", baseIndex) -// case "service_log": -// return fmt.Sprintf("%s-service-%s", baseIndex, entry.Service) -// default: -// return fmt.Sprintf("%s-%s", baseIndex, entry.Tool) -// } -// } diff --git a/elasticsearch_exporter.go b/elasticsearch_exporter.go index 0a36f5f..55f4dbe 100644 --- a/elasticsearch_exporter.go +++ b/elasticsearch_exporter.go @@ -4,280 +4,87 @@ import ( "context" "encoding/json" "fmt" - "io" "log/slog" "strings" "time" + "watch-tool/models" "github.com/elastic/go-elasticsearch/v8" ) type ElasticsearchExporter struct { client *elasticsearch.Client + config ElasticsearchConfig } -func NewElasticsearchExporter(client *elasticsearch.Client) *ElasticsearchExporter { +func NewElasticsearchExporter(config ElasticsearchConfig) (*ElasticsearchExporter, error) { + client, err := NewElasticsearchClient(config) + if err != nil { + return nil, fmt.Errorf("failed to create Elasticsearch client: %w", err) + } + return &ElasticsearchExporter{ client: client, - } + config: config, + }, nil } -type ExportResult struct { - Index string `json:"index"` - DocumentCount int `json:"document_count"` - StartTime time.Time `json:"start_time"` - EndTime time.Time `json:"end_time"` - Duration string `json:"duration"` - Error string `json:"error,omitempty"` -} - -func (e *ElasticsearchExporter) ExportToStream(ctx context.Context, indices []string, batchSize int, since int, writer io.Writer) error { - startTime := time.Now() - - if _, err := writer.Write([]byte("{\n \"export_info\": {\n")); err != nil { - return fmt.Errorf("error writing export header: %w", err) +func (e *ElasticsearchExporter) Export(ctx context.Context, entries []models.LogMessage) error { + if len(entries) == 0 { + return nil } - exportInfo := map[string]any{ - "timestamp": startTime, - "indices": indices, - "batch_size": batchSize, - "sinceDays": since, - } + var body strings.Builder + for _, entry := range entries { + indexName := e.config.Index - infoBytes, err := json.MarshalIndent(exportInfo, " ", " ") - if err != nil { - return fmt.Errorf("error marshalling export info: %w", err) - } + indexLine := fmt.Sprintf(`{"index":{"_index":"%s"}}`, indexName) + body.WriteString(indexLine) + body.WriteString("\n") - infoStr := string(infoBytes) - infoStr = strings.TrimPrefix(infoStr, "{") - infoStr = strings.TrimSuffix(infoStr, "}") - - if _, err := writer.Write([]byte(infoStr)); err != nil { - return fmt.Errorf("error writing export info: %w", err) - } - - if _, err := writer.Write([]byte("\n },\n \"data\": {\n")); err != nil { - return fmt.Errorf("error writing data header: %w", err) - } - - results := make([]ExportResult, 0, len(indices)) - first := true - - for _, index := range indices { - if !first { - if _, err := writer.Write([]byte(",\n")); err != nil { - return fmt.Errorf("error writing separator: %w", err) - } - } - first = false - - result := e.exportIndex(ctx, index, batchSize, since, writer) - results = append(results, result) - - if result.Error != "" { - slog.Error("error exporting index", "index", index, "error", result.Error) + data, err := json.Marshal(entry) + if err != nil { + slog.Error("error marshalling JSON", "error", err) + continue } + body.WriteString(string(data)) + body.WriteString("\n") } - if _, err := writer.Write([]byte("\n },\n \"results\": ")); err != nil { - return fmt.Errorf("error writing results header: %w", err) - } + timeout := time.Duration(e.config.Timeout) * time.Second + ctx, cancel := context.WithTimeout(ctx, timeout) + defer cancel() - if err := json.NewEncoder(writer).Encode(results); err != nil { - return fmt.Errorf("error writing results: %w", err) - } - - if _, err := writer.Write([]byte("}\n")); err != nil { - return fmt.Errorf("error writing final bracket: %w", err) - } - - duration := time.Since(startTime) - slog.Info("Export completed", "duration", duration, "indices_count", len(indices)) - - return nil -} - -func (e *ElasticsearchExporter) exportIndex(ctx context.Context, index string, batchSize int, since int, writer io.Writer) ExportResult { - startTime := time.Now() - result := ExportResult{ - Index: index, - StartTime: startTime, - } - - if _, err := fmt.Fprintf(writer, " \"%s\": [\n", index); err != nil { - result.Error = fmt.Sprintf("error writing index header: %v", err) - result.EndTime = time.Now() - result.Duration = time.Since(startTime).String() - return result - } - - query := `{"query":{"match_all":{}}}` - if since > 0 { - query = fmt.Sprintf(`{ - "query": { - "range": { - "timestamp": { - "gte": "now-%dd/d", - "lt": "now/d" - } - } - } -}`, since) - } - res, err := e.client.Search( - e.client.Search.WithContext(ctx), - e.client.Search.WithIndex(index), - e.client.Search.WithScroll(1000), - e.client.Search.WithSize(batchSize), - e.client.Search.WithBody(strings.NewReader(query)), + res, err := e.client.Bulk( + strings.NewReader(body.String()), + e.client.Bulk.WithContext(ctx), ) if err != nil { - result.Error = fmt.Sprintf("error in initial search: %v", err) - result.EndTime = time.Now() - result.Duration = time.Since(startTime).String() - return result + return fmt.Errorf("bulk request error: %w", err) } defer res.Body.Close() if res.IsError() { - result.Error = fmt.Sprintf("elasticsearch error: %s", res.String()) - result.EndTime = time.Now() - result.Duration = time.Since(startTime).String() - return result + return fmt.Errorf("bulk request failed: %s", res.String()) } - var searchResult map[string]any - if err := json.NewDecoder(res.Body).Decode(&searchResult); err != nil { - result.Error = fmt.Sprintf("error decoding search result: %v", err) - result.EndTime = time.Now() - result.Duration = time.Since(startTime).String() - return result - } - - scrollID, ok := searchResult["_scroll_id"].(string) - if !ok { - result.Error = "no scroll_id found in search result" - result.EndTime = time.Now() - result.Duration = time.Since(startTime).String() - return result - } - - hits := searchResult["hits"].(map[string]any)["hits"].([]any) - firstDocument := true - documentCount := 0 - - for _, hit := range hits { - if !firstDocument { - if _, err := writer.Write([]byte(",\n")); err != nil { - result.Error = fmt.Sprintf("error writing separator: %v", err) - result.EndTime = time.Now() - result.Duration = time.Since(startTime).String() - return result - } - } - firstDocument = false - - source := hit.(map[string]any)["_source"] - if err := e.writeDocument(writer, source); err != nil { - result.Error = fmt.Sprintf("error writing document: %v", err) - result.EndTime = time.Now() - result.Duration = time.Since(startTime).String() - return result - } - documentCount++ - } - - for { - select { - case <-ctx.Done(): - result.Error = "context cancelled" - result.EndTime = time.Now() - result.Duration = time.Since(startTime).String() - return result - default: - } - - scrollRes, err := e.client.Scroll( - e.client.Scroll.WithScrollID(scrollID), - e.client.Scroll.WithScroll(1000), - e.client.Scroll.WithContext(ctx), - ) - if err != nil { - result.Error = fmt.Sprintf("error in scroll request: %v", err) - break - } - defer scrollRes.Body.Close() - - if scrollRes.IsError() { - result.Error = fmt.Sprintf("elasticsearch scroll error: %s", scrollRes.String()) - break - } - - var scrollResult map[string]any - if err := json.NewDecoder(scrollRes.Body).Decode(&scrollResult); err != nil { - result.Error = fmt.Sprintf("error decoding scroll result: %v", err) - break - } - - hits := scrollResult["hits"].(map[string]any)["hits"].([]any) - if len(hits) == 0 { - break - } - - scrollID, _ = scrollResult["_scroll_id"].(string) - - for _, hit := range hits { - if _, err := writer.Write([]byte(",\n")); err != nil { - result.Error = fmt.Sprintf("error writing separator: %v", err) - result.EndTime = time.Now() - result.Duration = time.Since(startTime).String() - return result - } - - source := hit.(map[string]any)["_source"] - if err := e.writeDocument(writer, source); err != nil { - result.Error = fmt.Sprintf("error writing document: %v", err) - result.EndTime = time.Now() - result.Duration = time.Since(startTime).String() - return result - } - documentCount++ - } - } - - if _, err := writer.Write([]byte("\n ]")); err != nil { - if result.Error == "" { - result.Error = fmt.Sprintf("error writing index footer: %v", err) - } - } - - result.DocumentCount = documentCount - result.EndTime = time.Now() - result.Duration = time.Since(startTime).String() - - slog.Info("Index export completed", - "index", index, - "documents", documentCount, - "duration", result.Duration, - ) - - return result + slog.Debug("Batch successfully exported to Elasticsearch", "count", len(entries)) + return nil } -func (e *ElasticsearchExporter) writeDocument(writer io.Writer, document any) error { - jsonBytes, err := json.MarshalIndent(document, " ", " ") +func (e *ElasticsearchExporter) HealthCheck(ctx context.Context) error { + timeout := time.Duration(e.config.Timeout) * time.Second + ctx, cancel := context.WithTimeout(ctx, timeout) + defer cancel() + + res, err := e.client.Info(e.client.Info.WithContext(ctx)) if err != nil { - return fmt.Errorf("error marshalling document: %w", err) + return fmt.Errorf("health check failed: %w", err) } + defer res.Body.Close() - if _, err := writer.Write([]byte(" ")); err != nil { - return err - } - - if _, err := writer.Write(jsonBytes); err != nil { - return err + if res.IsError() { + return fmt.Errorf("health check failed: %s", res.String()) } return nil diff --git a/elasticsearch_exporterV2.go b/elasticsearch_exporterV2.go deleted file mode 100644 index b0205e5..0000000 --- a/elasticsearch_exporterV2.go +++ /dev/null @@ -1,91 +0,0 @@ -package main - -import ( - "context" - "encoding/json" - "fmt" - "log/slog" - "strings" - "time" - "tixel_watch/models" - - "github.com/elastic/go-elasticsearch/v8" -) - -type ElasticsearchExporterV2 struct { - client *elasticsearch.Client - config ElasticsearchConfig -} - -func NewElasticsearchExporterV2(config ElasticsearchConfig) (*ElasticsearchExporterV2, error) { - client, err := NewElasticsearchClient(config) - if err != nil { - return nil, fmt.Errorf("failed to create Elasticsearch client: %w", err) - } - - return &ElasticsearchExporterV2{ - client: client, - config: config, - }, nil -} - -func (e *ElasticsearchExporterV2) Export(ctx context.Context, entries []models.LogMessage) error { - if len(entries) == 0 { - return nil - } - - var body strings.Builder - for _, entry := range entries { - indexName := e.config.Index - - indexLine := fmt.Sprintf(`{"index":{"_index":"%s"}}`, indexName) - body.WriteString(indexLine) - body.WriteString("\n") - - data, err := json.Marshal(entry) - if err != nil { - slog.Error("error marshalling JSON", "error", err) - continue - } - body.WriteString(string(data)) - body.WriteString("\n") - } - - timeout := time.Duration(e.config.Timeout) * time.Second - ctx, cancel := context.WithTimeout(ctx, timeout) - defer cancel() - - res, err := e.client.Bulk( - strings.NewReader(body.String()), - e.client.Bulk.WithContext(ctx), - ) - if err != nil { - return fmt.Errorf("bulk request error: %w", err) - } - defer res.Body.Close() - - if res.IsError() { - return fmt.Errorf("bulk request failed: %s", res.String()) - } - - slog.Debug("Batch successfully exported to Elasticsearch", "count", len(entries)) - return nil -} - -func (e *ElasticsearchExporterV2) HealthCheck(ctx context.Context) error { - timeout := time.Duration(e.config.Timeout) * time.Second - ctx, cancel := context.WithTimeout(ctx, timeout) - defer cancel() - - res, err := e.client.Info(e.client.Info.WithContext(ctx)) - if err != nil { - return fmt.Errorf("health check failed: %w", err) - } - defer res.Body.Close() - - if res.IsError() { - return fmt.Errorf("health check failed: %s", res.String()) - } - - return nil -} diff --git a/export_manager.go b/export_manager.go index bf0ee72..a7fe501 100644 --- a/export_manager.go +++ b/export_manager.go @@ -8,7 +8,7 @@ import ( "strings" "sync" "time" - "tixel_watch/models" + "watch-tool/models" ) type ExportManager struct { diff --git a/exporter_interface.go b/exporter_interface.go new file mode 100644 index 0000000..5753f9e --- /dev/null +++ b/exporter_interface.go @@ -0,0 +1,11 @@ +package main + +import ( + "context" + "watch-tool/models" +) + +type ExporterInterface interface { + Export(ctx context.Context, entries []models.LogMessage) error + HealthCheck(ctx context.Context) error +} diff --git a/file_monitor.go b/file_monitor.go index ab2c1de..608bd17 100644 --- a/file_monitor.go +++ b/file_monitor.go @@ -6,47 +6,67 @@ import ( "log/slog" "regexp" "strings" - "tixel_watch/models" - "tixel_watch/parser" + "watch-tool/models" + "watch-tool/parser" + "watch-tool/patterns" + "codeberg.org/pata1704/drain3" "github.com/hpcloud/tail" ) type FileMonitor struct { - config ToolConfig - parser parser.Parser + config ToolConfig + parser parser.Parser + hostname string } -func NewFileMonitor(config ToolConfig) *FileMonitor { +func NewFileMonitor(config ToolConfig, hostname string, drainCfg *drain3.Config, stateDir string) *FileMonitor { var logParser parser.Parser + pCfg := parser.ParserConfig{ + ServiceName: config.Name, + LogType: "custom", + Hostname: hostname, + DrainConfig: drainCfg, + StateDir: stateDir, + } + if config.Format.Pattern != "" { - pattern, err := regexp.Compile(config.Format.Pattern) + compiledRegex, err := regexp.Compile(config.Format.Pattern) if err != nil { - slog.Error("invalid regex pattern", "tool", config.Name, "error", err) - logParser = &parser.DefaultParser{} + slog.Error("Invalid regex pattern in tool config", "tool", config.Name, "error", err) + logParser = parser.NewGenericParser(config.Name, hostname, pCfg.DrainConfig, pCfg.StateDir) + } else { - logParser = &parser.RegexLogParser{ - Pattern: pattern, - Fields: config.Format.Fields, - Toolname: config.Name, + gp := parser.NewGenericParser(config.Name, hostname, pCfg.DrainConfig, pCfg.StateDir) + + customExtractor := patterns.CompiledExtractor{ + Name: "config_custom_pattern", + Pattern: compiledRegex, + Fields: config.Format.Fields, } + + gp.Extractors = append(gp.Extractors, customExtractor) + logParser = gp } } else { var err error - logParser, err = parser.New(config.Name, "custom") + logParser, err = parser.New(pCfg) if err != nil { - slog.Error("cannot get tool specific parser", "error", err) + slog.Error("Cannot get tool specific parser from factory", "error", err) + logParser = parser.NewGenericParser(config.Name, hostname, pCfg.DrainConfig, pCfg.StateDir) } } return &FileMonitor{ - config: config, - parser: logParser, + config: config, + parser: logParser, + hostname: hostname, } } func (fm *FileMonitor) Start(ctx context.Context, out chan<- models.LogMessage) error { + defer fm.parser.Close() t, err := tail.TailFile(fm.config.LogFile, tail.Config{ Follow: true, ReOpen: true, @@ -72,7 +92,7 @@ func (fm *FileMonitor) Start(ctx context.Context, out chan<- models.LogMessage) } if line.Err != nil { - slog.Error("error reading log file", "tool", fm.config.Name, "error", line.Err) + slog.Error("Error reading log file", "tool", fm.config.Name, "error", line.Err) continue } @@ -82,7 +102,11 @@ func (fm *FileMonitor) Start(ctx context.Context, out chan<- models.LogMessage) entry, err := fm.parser.Parse(line.Text) if err != nil { - slog.Error("error parsing log line", "error", err) + slog.Error("Error parsing log line", "tool", fm.config.Name, "error", err) + } else { + if entry.Tool == "" { + entry.Tool = fm.config.Name + } } select { diff --git a/go.mod b/go.mod index fdc6247..648e926 100644 --- a/go.mod +++ b/go.mod @@ -1,14 +1,16 @@ -module tixel_watch +module watch-tool -go 1.24.1 +go 1.25.5 require ( + codeberg.org/pata1704/drain3 v1.0.0 github.com/elastic/go-elasticsearch/v8 v8.19.0 github.com/hpcloud/tail v1.0.0 github.com/shirou/gopsutil v3.21.11+incompatible github.com/spf13/viper v1.20.1 - golang.org/x/sys v0.34.0 - modernc.org/sqlite v1.39.0 + golang.org/x/sys v0.37.0 + gopkg.in/yaml.v3 v3.0.1 + modernc.org/sqlite v1.44.1 ) require ( @@ -21,7 +23,7 @@ require ( github.com/go-viper/mapstructure/v2 v2.2.1 // indirect github.com/google/uuid v1.6.0 // indirect github.com/mattn/go-isatty v0.0.20 // indirect - github.com/ncruces/go-strftime v0.1.9 // indirect + github.com/ncruces/go-strftime v1.0.0 // indirect github.com/pelletier/go-toml/v2 v2.2.3 // indirect github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec // indirect github.com/sagikazarmark/locafero v0.7.0 // indirect @@ -38,12 +40,11 @@ require ( go.opentelemetry.io/otel/trace v1.29.0 // indirect go.uber.org/atomic v1.9.0 // indirect go.uber.org/multierr v1.9.0 // indirect - golang.org/x/exp v0.0.0-20250620022241-b7579e27df2b // indirect + golang.org/x/exp v0.0.0-20251023183803-a4bb9ffd2546 // indirect golang.org/x/text v0.21.0 // indirect gopkg.in/fsnotify.v1 v1.4.7 // indirect gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7 // indirect - gopkg.in/yaml.v3 v3.0.1 // indirect - modernc.org/libc v1.66.3 // indirect + modernc.org/libc v1.67.6 // indirect modernc.org/mathutil v1.7.1 // indirect modernc.org/memory v1.11.0 // indirect ) diff --git a/go.sum b/go.sum index 01d3bef..c69400f 100644 --- a/go.sum +++ b/go.sum @@ -1,3 +1,5 @@ +codeberg.org/pata1704/drain3 v1.0.0 h1:X66fn+lnzOMU+PFFSkNBF89z1ghbqihE1I4A6x/OJIM= +codeberg.org/pata1704/drain3 v1.0.0/go.mod h1:+K1hIYh3hNSPiXRxUin6ZiC2CC9FDGqQKNNR+7ZIx9s= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= @@ -26,6 +28,8 @@ github.com/google/pprof v0.0.0-20250317173921-a4b03ec1a45e h1:ijClszYn+mADRFY17k github.com/google/pprof v0.0.0-20250317173921-a4b03ec1a45e/go.mod h1:boTsfXsheKC2y+lKOCMpSfarhxDeIzfZG1jqGcPl3cA= github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= +github.com/hashicorp/golang-lru/v2 v2.0.7 h1:a+bsQ5rvGLjzHuww6tVxozPZFVghXaHOwFs4luLUK2k= +github.com/hashicorp/golang-lru/v2 v2.0.7/go.mod h1:QeFd9opnmA6QUJc5vARoKUSoFhyfM2/ZepoAG6RGpeM= github.com/hpcloud/tail v1.0.0 h1:nfCOvKYfkgYP8hkirhJocXT2+zOD8yUNjXaWfTlyFKI= github.com/hpcloud/tail v1.0.0/go.mod h1:ab1qPbhIpdTxEkNHXyeSf5vhxWSCs/tWer42PpOxQnU= github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE= @@ -34,8 +38,8 @@ github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE= github.com/mattn/go-isatty v0.0.20 h1:xfD0iDuEKnDkl03q4limB+vH+GxLEtL/jb4xVJSWWEY= github.com/mattn/go-isatty v0.0.20/go.mod h1:W+V8PltTTMOvKvAeJH7IuucS94S2C6jfK/D7dTCTo3Y= -github.com/ncruces/go-strftime v0.1.9 h1:bY0MQC28UADQmHmaF5dgpLmImcShSi2kHU9XLdhx/f4= -github.com/ncruces/go-strftime v0.1.9/go.mod h1:Fwc5htZGVVkseilnfgOVb9mKy6w1naJmn9CehxcKcls= +github.com/ncruces/go-strftime v1.0.0 h1:HMFp8mLCTPp341M/ZnA4qaf7ZlsbTc+miZjCLOFAw7w= +github.com/ncruces/go-strftime v1.0.0/go.mod h1:Fwc5htZGVVkseilnfgOVb9mKy6w1naJmn9CehxcKcls= github.com/pelletier/go-toml/v2 v2.2.3 h1:YmeHyLY8mFWbdkNWwpr+qIL2bEqT0o95WSdkNHvL12M= github.com/pelletier/go-toml/v2 v2.2.3/go.mod h1:MfCQTFTvCcUyyvvwm1+G6H/jORL20Xlb6rzQu9GuUkc= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= @@ -82,20 +86,20 @@ go.uber.org/atomic v1.9.0 h1:ECmE8Bn/WFTYwEW/bpKD3M8VtR/zQVbavAoalC1PYyE= go.uber.org/atomic v1.9.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc= go.uber.org/multierr v1.9.0 h1:7fIwc/ZtS0q++VgcfqFDxSBZVv/Xo49/SYnDFupUwlI= go.uber.org/multierr v1.9.0/go.mod h1:X2jQV1h+kxSjClGpnseKVIxpmcjrj7MNnI0bnlfKTVQ= -golang.org/x/exp v0.0.0-20250620022241-b7579e27df2b h1:M2rDM6z3Fhozi9O7NWsxAkg/yqS/lQJ6PmkyIV3YP+o= -golang.org/x/exp v0.0.0-20250620022241-b7579e27df2b/go.mod h1:3//PLf8L/X+8b4vuAfHzxeRUl04Adcb341+IGKfnqS8= -golang.org/x/mod v0.25.0 h1:n7a+ZbQKQA/Ysbyb0/6IbB1H/X41mKgbhfv7AfG/44w= -golang.org/x/mod v0.25.0/go.mod h1:IXM97Txy2VM4PJ3gI61r1YEk/gAj6zAHN3AdZt6S9Ww= -golang.org/x/sync v0.15.0 h1:KWH3jNZsfyT6xfAfKiz6MRNmd46ByHDYaZ7KSkCtdW8= -golang.org/x/sync v0.15.0/go.mod h1:1dzgHSNfp02xaA81J2MS99Qcpr2w7fw1gpm99rleRqA= +golang.org/x/exp v0.0.0-20251023183803-a4bb9ffd2546 h1:mgKeJMpvi0yx/sU5GsxQ7p6s2wtOnGAHZWCHUM4KGzY= +golang.org/x/exp v0.0.0-20251023183803-a4bb9ffd2546/go.mod h1:j/pmGrbnkbPtQfxEe5D0VQhZC6qKbfKifgD0oM7sR70= +golang.org/x/mod v0.29.0 h1:HV8lRxZC4l2cr3Zq1LvtOsi/ThTgWnUk/y64QSs8GwA= +golang.org/x/mod v0.29.0/go.mod h1:NyhrlYXJ2H4eJiRy/WDBO6HMqZQ6q9nk4JzS3NuCK+w= +golang.org/x/sync v0.17.0 h1:l60nONMj9l5drqw6jlhIELNv9I0A4OFgRsG9k2oT9Ug= +golang.org/x/sync v0.17.0/go.mod h1:9KTHXmSnoGruLpwFjVSX0lNNA75CykiMECbovNTZqGI= golang.org/x/sys v0.0.0-20190916202348-b4ddaad3f8a3/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= -golang.org/x/sys v0.34.0 h1:H5Y5sJ2L2JRdyv7ROF1he/lPdvFsd0mJHFw2ThKHxLA= -golang.org/x/sys v0.34.0/go.mod h1:BJP2sWEmIv4KK5OTEluFJCKSidICx8ciO85XgH3Ak8k= +golang.org/x/sys v0.37.0 h1:fdNQudmxPjkdUTPnLn5mdQv7Zwvbvpaxqs831goi9kQ= +golang.org/x/sys v0.37.0/go.mod h1:OgkHotnGiDImocRcuBABYBEXf8A9a87e/uXjp9XT3ks= golang.org/x/text v0.21.0 h1:zyQAAkrwaneQ066sspRyJaG9VNi/YJ1NfzcGB3hZ/qo= golang.org/x/text v0.21.0/go.mod h1:4IBbMaMmOPCJ8SecivzSH54+73PCFmPWxNTLm+vZkEQ= -golang.org/x/tools v0.34.0 h1:qIpSLOxeCYGg9TrcJokLBG4KFA6d795g0xkBkiESGlo= -golang.org/x/tools v0.34.0/go.mod h1:pAP9OwEaY1CAW3HOmg3hLZC5Z0CCmzjAF2UQMSqNARg= +golang.org/x/tools v0.38.0 h1:Hx2Xv8hISq8Lm16jvBZ2VQf+RLmbd7wVUsALibYI/IQ= +golang.org/x/tools v0.38.0/go.mod h1:yEsQ/d/YK8cjh0L6rZlY8tgtlKiBNTL14pGDJPJpYQs= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15 h1:YR8cESwS4TdDjEe65xsg0ogRM/Nc3DYOhEAlW+xobZo= gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= @@ -105,18 +109,20 @@ gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7 h1:uRGJdciOHaEIrze2W8Q3AKkep gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7/go.mod h1:dt/ZhP58zS4L8KSrWDmTeBkI65Dw0HsyUHuEVlX15mw= gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= -modernc.org/cc/v4 v4.26.2 h1:991HMkLjJzYBIfha6ECZdjrIYz2/1ayr+FL8GN+CNzM= -modernc.org/cc/v4 v4.26.2/go.mod h1:uVtb5OGqUKpoLWhqwNQo/8LwvoiEBLvZXIQ/SmO6mL0= -modernc.org/ccgo/v4 v4.28.0 h1:rjznn6WWehKq7dG4JtLRKxb52Ecv8OUGah8+Z/SfpNU= -modernc.org/ccgo/v4 v4.28.0/go.mod h1:JygV3+9AV6SmPhDasu4JgquwU81XAKLd3OKTUDNOiKE= -modernc.org/fileutil v1.3.8 h1:qtzNm7ED75pd1C7WgAGcK4edm4fvhtBsEiI/0NQ54YM= -modernc.org/fileutil v1.3.8/go.mod h1:HxmghZSZVAz/LXcMNwZPA/DRrQZEVP9VX0V4LQGQFOc= +modernc.org/cc/v4 v4.27.1 h1:9W30zRlYrefrDV2JE2O8VDtJ1yPGownxciz5rrbQZis= +modernc.org/cc/v4 v4.27.1/go.mod h1:uVtb5OGqUKpoLWhqwNQo/8LwvoiEBLvZXIQ/SmO6mL0= +modernc.org/ccgo/v4 v4.30.1 h1:4r4U1J6Fhj98NKfSjnPUN7Ze2c6MnAdL0hWw6+LrJpc= +modernc.org/ccgo/v4 v4.30.1/go.mod h1:bIOeI1JL54Utlxn+LwrFyjCx2n2RDiYEaJVSrgdrRfM= +modernc.org/fileutil v1.3.40 h1:ZGMswMNc9JOCrcrakF1HrvmergNLAmxOPjizirpfqBA= +modernc.org/fileutil v1.3.40/go.mod h1:HxmghZSZVAz/LXcMNwZPA/DRrQZEVP9VX0V4LQGQFOc= modernc.org/gc/v2 v2.6.5 h1:nyqdV8q46KvTpZlsw66kWqwXRHdjIlJOhG6kxiV/9xI= modernc.org/gc/v2 v2.6.5/go.mod h1:YgIahr1ypgfe7chRuJi2gD7DBQiKSLMPgBQe9oIiito= +modernc.org/gc/v3 v3.1.1 h1:k8T3gkXWY9sEiytKhcgyiZ2L0DTyCQ/nvX+LoCljoRE= +modernc.org/gc/v3 v3.1.1/go.mod h1:HFK/6AGESC7Ex+EZJhJ2Gni6cTaYpSMmU/cT9RmlfYY= modernc.org/goabi0 v0.2.0 h1:HvEowk7LxcPd0eq6mVOAEMai46V+i7Jrj13t4AzuNks= modernc.org/goabi0 v0.2.0/go.mod h1:CEFRnnJhKvWT1c1JTI3Avm+tgOWbkOu5oPA8eH8LnMI= -modernc.org/libc v1.66.3 h1:cfCbjTUcdsKyyZZfEUKfoHcP3S0Wkvz3jgSzByEWVCQ= -modernc.org/libc v1.66.3/go.mod h1:XD9zO8kt59cANKvHPXpx7yS2ELPheAey0vjIuZOhOU8= +modernc.org/libc v1.67.6 h1:eVOQvpModVLKOdT+LvBPjdQqfrZq+pC39BygcT+E7OI= +modernc.org/libc v1.67.6/go.mod h1:JAhxUVlolfYDErnwiqaLvUqc8nfb2r6S6slAgZOnaiE= modernc.org/mathutil v1.7.1 h1:GCZVGXdaN8gTqB1Mf/usp1Y/hSqgI2vAGGP4jZMCxOU= modernc.org/mathutil v1.7.1/go.mod h1:4p5IwJITfppl0G4sUEDtCr4DthTaT47/N3aT6MhfgJg= modernc.org/memory v1.11.0 h1:o4QC8aMQzmcwCK3t3Ux/ZHmwFPzE6hf2Y5LbkRs+hbI= @@ -125,8 +131,8 @@ modernc.org/opt v0.1.4 h1:2kNGMRiUjrp4LcaPuLY2PzUfqM/w9N23quVwhKt5Qm8= modernc.org/opt v0.1.4/go.mod h1:03fq9lsNfvkYSfxrfUhZCWPk1lm4cq4N+Bh//bEtgns= modernc.org/sortutil v1.2.1 h1:+xyoGf15mM3NMlPDnFqrteY07klSFxLElE2PVuWIJ7w= modernc.org/sortutil v1.2.1/go.mod h1:7ZI3a3REbai7gzCLcotuw9AC4VZVpYMjDzETGsSMqJE= -modernc.org/sqlite v1.39.0 h1:6bwu9Ooim0yVYA7IZn9demiQk/Ejp0BtTjBWFLymSeY= -modernc.org/sqlite v1.39.0/go.mod h1:cPTJYSlgg3Sfg046yBShXENNtPrWrDX8bsbAQBzgQ5E= +modernc.org/sqlite v1.44.1 h1:qybx/rNpfQipX/t47OxbHmkkJuv2JWifCMH8SVUiDas= +modernc.org/sqlite v1.44.1/go.mod h1:CzbrU2lSB1DKUusvwGz7rqEKIq+NUd8GWuBBZDs9/nA= modernc.org/strutil v1.2.1 h1:UneZBkQA+DX2Rp35KcM69cSsNES9ly8mQWD71HKlOA0= modernc.org/strutil v1.2.1/go.mod h1:EHkiggD70koQxjVdSBM3JKM7k6L0FbGE5eymy9i3B9A= modernc.org/token v1.1.0 h1:Xl7Ap9dKaEs5kLoOQeQmPWevfnk/DM5qcLcYlA8ys6Y= diff --git a/helpers/helpers.go b/helpers/helpers.go index e16183c..2a35776 100644 --- a/helpers/helpers.go +++ b/helpers/helpers.go @@ -3,11 +3,10 @@ package helpers import ( "fmt" "log/slog" - "os" "regexp" "strings" "time" - "tixel_watch/models" + "watch-tool/models" ) var ( @@ -76,11 +75,3 @@ func ParseSyslogTimeToRFC3339(syslogTime string) (time.Time, error) { t = t.AddDate(now.Year(), 0, 0) return t, nil } - -func GetHostname() (string, error) { - hostname, err := os.Hostname() - if err != nil { - hostname = "unknown" - } - return hostname, nil -} diff --git a/helpers/utils.go b/helpers/utils.go new file mode 100644 index 0000000..c8933bd --- /dev/null +++ b/helpers/utils.go @@ -0,0 +1,47 @@ +package helpers + +import ( + "context" + "fmt" + "log/slog" + "runtime/debug" +) + +type AppError struct { + Op string + Err error + Context string +} + +func (e *AppError) Error() string { + if e.Context != "" { + return fmt.Sprintf("%s: %v (%s)", e.Op, e.Err, e.Context) + } + return fmt.Sprintf("%s: %v", e.Op, e.Err) +} + +func (e *AppError) Unwrap() error { + return e.Err +} + +func NewAppError(op string, err error, ctx string) error { + return &AppError{Op: op, Err: err, Context: ctx} +} + +func SafeGo(ctx context.Context, name string, fn func()) { + go func() { + defer func() { + if r := recover(); r != nil { + stack := string(debug.Stack()) + slog.Error("CRITICAL: Panic recovered in goroutine", + "goroutine", name, + "panic", r, + "stack", stack, + ) + // Optional: Hier könnte man Metriken inkrementieren (siehe Observability) + } + }() + + fn() + }() +} diff --git a/install.sh b/install.sh deleted file mode 100755 index c203051..0000000 --- a/install.sh +++ /dev/null @@ -1,71 +0,0 @@ -#!/bin/bash -set -e - -ES_VERSION="9.1.2" -ES_DEB_URL="https://artifacts.elastic.co/downloads/elasticsearch/elasticsearch-${ES_VERSION}-amd64.deb" -ES_RPM_URL="https://artifacts.elastic.co/downloads/elasticsearch/elasticsearch-${ES_VERSION}-x86_64.rpm" -ES_CONFIG_DIR="/etc/elasticsearch" -ES_JVM_OPTIONS="/etc/elasticsearch/jvm.options" -ES_JVM_OPTIONS_D="/etc/elasticsearch/jvm.options.d" -GO_SERVICE_NAME="tixel-watch" -GO_INSTALL_TARGET="/opt/tixel/tixel-watch" - -install_es_deb() { - echo "Installing Elasticsearch (Debian package)..." - wget "${ES_DEB_URL}" -O elasticsearch.deb - sudo dpkg -i elasticsearch.deb - sudo apt-get install -f -y -} - -install_es_rpm() { - echo "Installing Elasticsearch (RPM package)..." - wget "${ES_RPM_URL}" -O elasticsearch.rpm - sudo rpm --install elasticsearch.rpm -} - -setup_configuration() { - echo "Copying Elasticsearch configuration files..." - sudo cp ./configs/elasticsearch.yml "${ES_CONFIG_DIR}/elasticsearch.yml" - sudo cp ./configs/jvm.options "${ES_JVM_OPTIONS}" - sudo cp -r ./configs/jvm.options.d "${ES_JVM_OPTIONS_D}" - sudo chown root:elasticsearch "${ES_CONFIG_DIR}/elasticsearch.yml" "${ES_JVM_OPTIONS}" - sudo chmod 640 "${ES_CONFIG_DIR}/elasticsearch.yml" "${ES_JVM_OPTIONS}" -} - -setup_tixel_watch_service() { - echo "Setting up tixel-watch systemd service..." - if [ ! -d ${GO_INSTALL_TARGET} ]; then - mkdir -p ${GO_INSTALL_TARGET} - fi - sudo cp ./tixel-watch "$GO_INSTALL_TARGET"/ - sudo cp ./configs/config.yaml "$GO_INSTALL_TARGET"/ - sudo cp ./${GO_SERVICE_NAME}.service /etc/systemd/system/${GO_SERVICE_NAME}.service - sudo systemctl daemon-reload - sudo systemctl enable "${GO_SERVICE_NAME}" -} - -start_services() { - echo "Enabling and starting Elasticsearch service..." - sudo systemctl enable elasticsearch - sudo systemctl start elasticsearch - echo "Starting tixel-watch service..." - sudo systemctl start "${GO_SERVICE_NAME}" -} - -main() { - if command -v apt-get &>/dev/null; then - install_es_deb - elif command -v yum &>/dev/null || command -v dnf &>/dev/null; then - install_es_rpm - else - echo "Unsupported package manager. Aborting." - exit 1 - fi - - setup_configuration - setup_tixel_watch_service - start_services - echo "All done." -} - -main "$@" diff --git a/local_storage.go b/local_storage.go index f4b6265..bafdb80 100644 --- a/local_storage.go +++ b/local_storage.go @@ -4,28 +4,277 @@ import ( "context" "database/sql" "encoding/json" - "tixel_watch/models" + "fmt" + "log/slog" + "os" + "path/filepath" + "sort" + "strings" + "sync" + "time" + "watch-tool/models" _ "modernc.org/sqlite" ) -type StorageService struct { - db *sql.DB +type SQLiteStorage struct { + db *sql.DB + dbPath string + rotationCfg StorageRotationConfig + rotationStop chan struct{} + rotationWg sync.WaitGroup + mu sync.RWMutex } -func NewStorageService(dbPath string) (*StorageService, error) { - db, err := sql.Open("sqlite", dbPath) +func DefaultRotationConfig() StorageRotationConfig { + return StorageRotationConfig{ + MaxSizeBytes: 100 * 1024 * 1024, // 100MB + MaxAgeHours: 48 * time.Hour, // 48 hours + MaxFiles: 3, // 3 old Files + CheckIntervalMinutes: 5 * time.Minute, // check every 5 minutes + ArchiveDir: "", // same directory + } +} + +func NewSQLiteStorage(dbPath string) (*SQLiteStorage, error) { + return NewSQLiteStorageWithRotation(dbPath, StorageRotationConfig{}) +} + +func NewSQLiteStorageWithRotation(dbPath string, rotationCfg StorageRotationConfig) (*SQLiteStorage, error) { + if rotationCfg.CheckIntervalMinutes == 0 { + rotationCfg = DefaultRotationConfig() + } + dsn := fmt.Sprintf("%s?_busy_timeout=5000&_journal_mode=WAL", dbPath) + + db, err := sql.Open("sqlite", dsn) + + if err != nil { + return nil, fmt.Errorf("failed to open SQLite database: %w", err) + } + + if _, err := db.Exec("PRAGMA journal_mode=WAL"); err != nil { + return nil, fmt.Errorf("failed to enable WAL mode: %w", err) + } + + if err := createTables(db); err != nil { + return nil, fmt.Errorf("failed to create tables: %w", err) + } + + storage := &SQLiteStorage{ + db: db, + dbPath: dbPath, + rotationCfg: rotationCfg, + rotationStop: make(chan struct{}), + } + + if rotationCfg.MaxSizeBytes > 0 || rotationCfg.MaxAgeHours > 0 { + storage.rotationWg.Add(1) + go storage.rotationWorker() + slog.Info("Log rotation enabled", + "maxSize", rotationCfg.MaxSizeBytes, + "maxAge", rotationCfg.MaxAgeHours, + "maxFiles", rotationCfg.MaxFiles) + } + + return storage, nil +} + +func (s *SQLiteStorage) rotationWorker() { + defer s.rotationWg.Done() + ticker := time.NewTicker(s.rotationCfg.CheckIntervalMinutes) + defer ticker.Stop() + + for { + select { + case <-s.rotationStop: + return + case <-ticker.C: + if err := s.checkAndRotate(); err != nil { + slog.Error("Error during log rotation check", "error", err) + } + } + } +} + +func (s *SQLiteStorage) checkAndRotate() error { + s.mu.Lock() + defer s.mu.Unlock() + + needsRotation, reason, err := s.needsRotation() + if err != nil { + return fmt.Errorf("error checking rotation needs: %w", err) + } + + if needsRotation { + slog.Info("Starting log rotation", "reason", reason) + if err := s.rotateDatabase(); err != nil { + return fmt.Errorf("error rotating database: %w", err) + } + slog.Info("Log rotation completed successfully") + } + + return nil +} + +func (s *SQLiteStorage) needsRotation() (bool, string, error) { + if s.rotationCfg.MaxSizeBytes > 0 { + fileInfo, err := os.Stat(s.dbPath) + if err != nil { + return false, "", err + } + if fileInfo.Size() >= s.rotationCfg.MaxSizeBytes { + return true, fmt.Sprintf("file size %d >= max size %d", fileInfo.Size(), s.rotationCfg.MaxSizeBytes), nil + } + } + + if s.rotationCfg.MaxAgeHours > 0 { + fileInfo, err := os.Stat(s.dbPath) + if err != nil { + return false, "", err + } + age := time.Since(fileInfo.ModTime()) + if age >= s.rotationCfg.MaxAgeHours { + return true, fmt.Sprintf("file age %v >= max age %v", age, s.rotationCfg.MaxAgeHours), nil + } + } + + return false, "", nil +} + +func (s *SQLiteStorage) rotateDatabase() error { + if err := s.db.Close(); err != nil { + return fmt.Errorf("error closing database: %w", err) + } + + archivePath := s.generateArchivePath() + + if err := os.Rename(s.dbPath, archivePath); err != nil { + return fmt.Errorf("error moving database to archive: %w", err) + } + + db, err := sql.Open("sqlite", s.dbPath) + if err != nil { + return fmt.Errorf("error opening new database: %w", err) + } + + if _, err := db.Exec("PRAGMA journal_mode=WAL"); err != nil { + return fmt.Errorf("failed to enable WAL mode on new database: %w", err) + } + + if err := createTables(db); err != nil { + return fmt.Errorf("failed to create tables in new database: %w", err) + } + + s.db = db + + if err := s.cleanupOldArchives(); err != nil { + slog.Warn("Error cleaning up old archives", "error", err) + } + + return nil +} + +func (s *SQLiteStorage) generateArchivePath() string { + dir := filepath.Dir(s.dbPath) + if s.rotationCfg.ArchiveDir != "" { + dir = s.rotationCfg.ArchiveDir + os.MkdirAll(dir, 0755) + } + + base := filepath.Base(s.dbPath) + ext := filepath.Ext(base) + name := strings.TrimSuffix(base, ext) + + timestamp := time.Now().Format("2006-01-02_15-04-05") + archiveName := fmt.Sprintf("%s.%s%s", name, timestamp, ext) + + return filepath.Join(dir, archiveName) +} + +func (s *SQLiteStorage) cleanupOldArchives() error { + if s.rotationCfg.MaxFiles <= 0 { + return nil + } + + dir := filepath.Dir(s.dbPath) + if s.rotationCfg.ArchiveDir != "" { + dir = s.rotationCfg.ArchiveDir + } + + base := filepath.Base(s.dbPath) + ext := filepath.Ext(base) + name := strings.TrimSuffix(base, ext) + pattern := fmt.Sprintf("%s.*%s", name, ext) + + files, err := filepath.Glob(filepath.Join(dir, pattern)) + if err != nil { + return err + } + + var archives []string + for _, file := range files { + if file != s.dbPath { + archives = append(archives, file) + } + } + + sort.Slice(archives, func(i, j int) bool { + infoI, _ := os.Stat(archives[i]) + infoJ, _ := os.Stat(archives[j]) + return infoI.ModTime().After(infoJ.ModTime()) + }) + + if len(archives) > s.rotationCfg.MaxFiles { + for _, file := range archives[s.rotationCfg.MaxFiles:] { + if err := os.Remove(file); err != nil { + slog.Warn("Error removing old archive", "file", file, "error", err) + } else { + slog.Info("Removed old archive", "file", file) + } + } + } + + return nil +} + +func (s *SQLiteStorage) ForceRotate() error { + s.mu.Lock() + defer s.mu.Unlock() + + slog.Info("Forcing log rotation") + return s.rotateDatabase() +} + +func (s *SQLiteStorage) GetRotationInfo() (map[string]any, error) { + s.mu.RLock() + defer s.mu.RUnlock() + + fileInfo, err := os.Stat(s.dbPath) if err != nil { return nil, err } + info := map[string]any{ + "currentSize": fileInfo.Size(), + "maxSize": s.rotationCfg.MaxSizeBytes, + "currentAge": time.Since(fileInfo.ModTime()).String(), + "maxAge": s.rotationCfg.MaxAgeHours.String(), + "maxFiles": s.rotationCfg.MaxFiles, + "checkInterval": s.rotationCfg.CheckIntervalMinutes.String(), + "archiveDir": s.rotationCfg.ArchiveDir, + } + + return info, nil +} + +func createTables(db *sql.DB) error { createTableStmt := ` CREATE TABLE IF NOT EXISTS log_entries ( id INTEGER PRIMARY KEY AUTOINCREMENT, service TEXT, - timestamp DATETIME, - type TEXT, - host TEXT, + timestamp DATETIME NOT NULL, + type TEXT NOT NULL, + host TEXT NOT NULL, tool TEXT, log_level TEXT, log_message TEXT, @@ -39,143 +288,317 @@ func NewStorageService(dbPath string) (*StorageService, error) { fields TEXT, service_information TEXT, system_metrics TEXT, - tool_information TEXT - ); - ` - _, err = db.ExecContext(context.Background(), createTableStmt) - if err != nil { - return nil, err + tool_information TEXT, + created_at DATETIME DEFAULT CURRENT_TIMESTAMP, + exported_at DATETIME + );` + + if _, err := db.Exec(createTableStmt); err != nil { + return err } - return &StorageService{db: db}, nil + indexes := []string{ + "CREATE INDEX IF NOT EXISTS idx_timestamp ON log_entries(timestamp);", + "CREATE INDEX IF NOT EXISTS idx_service ON log_entries(service);", + "CREATE INDEX IF NOT EXISTS idx_type ON log_entries(type);", + "CREATE INDEX IF NOT EXISTS idx_tool ON log_entries(tool);", + "CREATE INDEX IF NOT EXISTS idx_log_level ON log_entries(log_level);", + "CREATE INDEX IF NOT EXISTS idx_exported ON log_entries(exported_at);", + "CREATE INDEX IF NOT EXISTS idx_composite ON log_entries(timestamp, type, service);", + } + + for _, index := range indexes { + if _, err := db.Exec(index); err != nil { + return fmt.Errorf("failed to create index: %w", err) + } + } + + return nil } -func (s *StorageService) Close() error { +func (s *SQLiteStorage) Store(ctx context.Context, entry *models.LogMessage) error { + return s.StoreBatch(ctx, []models.LogMessage{*entry}) +} + +func (s *SQLiteStorage) StoreBatch(ctx context.Context, entries []models.LogMessage) error { + if len(entries) == 0 { + return nil + } + + s.mu.RLock() + defer s.mu.RUnlock() + + tx, err := s.db.BeginTx(ctx, nil) + if err != nil { + return fmt.Errorf("failed to begin transaction: %w", err) + } + defer tx.Rollback() + + stmt, err := tx.PrepareContext(ctx, ` + INSERT INTO log_entries + (service, timestamp, type, host, tool, log_level, log_message, raw, priority, priority_name, + unit, pid, boot_id, machine_id, fields, service_information, system_metrics, tool_information) + VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)`) + if err != nil { + return fmt.Errorf("failed to prepare statement: %w", err) + } + defer stmt.Close() + + for _, entry := range entries { + fieldsJSON, _ := json.Marshal(entry.Fields) + serviceInfoJSON, _ := json.Marshal(entry.ServiceInformation) + systemMetricsJSON, _ := json.Marshal(entry.SystemMetrics) + toolInfoJSON, _ := json.Marshal(entry.ToolInformation) + + _, err := stmt.ExecContext(ctx, + entry.Service, + entry.Timestamp, + entry.Type, + entry.Host, + entry.Tool, + entry.LogLevel, + entry.LogMessage, + entry.Raw, + entry.Priority, + entry.PriorityName, + entry.Unit, + entry.PID, + entry.BootID, + entry.MachineID, + string(fieldsJSON), + string(serviceInfoJSON), + string(systemMetricsJSON), + string(toolInfoJSON), + ) + if err != nil { + return fmt.Errorf("failed to insert entry: %w", err) + } + } + + return tx.Commit() +} + +func (s *SQLiteStorage) Query(ctx context.Context, query StorageQuery) ([]models.LogMessage, error) { + s.mu.RLock() + defer s.mu.RUnlock() + + sqlQuery := "SELECT service, timestamp, type, host, tool, log_level, log_message, raw, priority, priority_name, unit, pid, boot_id, machine_id, fields, service_information, system_metrics, tool_information FROM log_entries WHERE 1=1" + args := []any{} + argCount := 0 + + if !query.StartTime.IsZero() { + argCount++ + sqlQuery += fmt.Sprintf(" AND timestamp >= ?%d", argCount) + args = append(args, query.StartTime) + } + if !query.EndTime.IsZero() { + argCount++ + sqlQuery += fmt.Sprintf(" AND timestamp <= ?%d", argCount) + args = append(args, query.EndTime) + } + if query.Service != "" { + argCount++ + sqlQuery += fmt.Sprintf(" AND service = ?%d", argCount) + args = append(args, query.Service) + } + if query.Tool != "" { + argCount++ + sqlQuery += fmt.Sprintf(" AND tool = ?%d", argCount) + args = append(args, query.Tool) + } + if query.LogLevel != "" { + argCount++ + sqlQuery += fmt.Sprintf(" AND log_level = ?%d", argCount) + args = append(args, query.LogLevel) + } + if query.Type != "" { + argCount++ + sqlQuery += fmt.Sprintf(" AND type = ?%d", argCount) + args = append(args, query.Type) + } + + if query.OrderBy != "" { + direction := "ASC" + if query.OrderDesc { + direction = "DESC" + } + sqlQuery += fmt.Sprintf(" ORDER BY %s %s", query.OrderBy, direction) + } else { + sqlQuery += " ORDER BY timestamp DESC" + } + + if query.Limit > 0 { + sqlQuery += fmt.Sprintf(" LIMIT %d", query.Limit) + if query.Offset > 0 { + sqlQuery += fmt.Sprintf(" OFFSET %d", query.Offset) + } + } + + rows, err := s.db.QueryContext(ctx, sqlQuery, args...) + if err != nil { + return nil, fmt.Errorf("failed to execute query: %w", err) + } + defer rows.Close() + + var entries []models.LogMessage + for rows.Next() { + var entry models.LogMessage + var fieldsJSON, serviceInfoJSON, systemMetricsJSON, toolInfoJSON string + + err := rows.Scan( + &entry.Service, + &entry.Timestamp, + &entry.Type, + &entry.Host, + &entry.Tool, + &entry.LogLevel, + &entry.LogMessage, + &entry.Raw, + &entry.Priority, + &entry.PriorityName, + &entry.Unit, + &entry.PID, + &entry.BootID, + &entry.MachineID, + &fieldsJSON, + &serviceInfoJSON, + &systemMetricsJSON, + &toolInfoJSON, + ) + if err != nil { + return nil, fmt.Errorf("failed to scan row: %w", err) + } + + if fieldsJSON != "" && fieldsJSON != "null" { + json.Unmarshal([]byte(fieldsJSON), &entry.Fields) + } + if serviceInfoJSON != "" && serviceInfoJSON != "null" { + json.Unmarshal([]byte(serviceInfoJSON), &entry.ServiceInformation) + } + if systemMetricsJSON != "" && systemMetricsJSON != "null" { + json.Unmarshal([]byte(systemMetricsJSON), &entry.SystemMetrics) + } + if toolInfoJSON != "" && toolInfoJSON != "null" { + json.Unmarshal([]byte(toolInfoJSON), &entry.ToolInformation) + } + + entries = append(entries, entry) + } + + return entries, rows.Err() +} + +func (s *SQLiteStorage) MarkAsExported(ctx context.Context, ids []int64) error { + if len(ids) == 0 { + return nil + } + + s.mu.RLock() + defer s.mu.RUnlock() + + tx, err := s.db.BeginTx(ctx, nil) + if err != nil { + return err + } + + placeholders := strings.Repeat("?,", len(ids)) + placeholders = placeholders[:len(placeholders)-1] + + sqlQuery := fmt.Sprintf("UPDATE log_entries SET exported_at = CURRENT_TIMESTAMP WHERE id IN (%s)", placeholders) + + args := make([]any, len(ids)) + for i, id := range ids { + args[i] = id + } + + _, err = tx.ExecContext(ctx, sqlQuery, args...) + if err != nil { + tx.Rollback() + return err + } + + return tx.Commit() +} + +func (s *SQLiteStorage) GetUnexportedEntries(ctx context.Context, limit int) ([]models.LogMessage, error) { + s.mu.RLock() + defer s.mu.RUnlock() + + sqlQuery := `SELECT id, service, timestamp, type, host, tool, log_level, log_message, raw, priority, priority_name, + unit, pid, boot_id, machine_id, fields, service_information, system_metrics, tool_information + FROM log_entries WHERE exported_at IS NULL ORDER BY timestamp ASC` + + if limit > 0 { + sqlQuery += fmt.Sprintf(" LIMIT %d", limit) + } + + rows, err := s.db.QueryContext(ctx, sqlQuery) + if err != nil { + return nil, fmt.Errorf("failed to execute query: %w", err) + } + defer rows.Close() + + var entries []models.LogMessage + for rows.Next() { + var entry models.LogMessage + var id int64 + var fieldsJSON, serviceInfoJSON, systemMetricsJSON, toolInfoJSON string + + err := rows.Scan( + &id, + &entry.Service, + &entry.Timestamp, + &entry.Type, + &entry.Host, + &entry.Tool, + &entry.LogLevel, + &entry.LogMessage, + &entry.Raw, + &entry.Priority, + &entry.PriorityName, + &entry.Unit, + &entry.PID, + &entry.BootID, + &entry.MachineID, + &fieldsJSON, + &serviceInfoJSON, + &systemMetricsJSON, + &toolInfoJSON, + ) + if err != nil { + return nil, fmt.Errorf("failed to scan row: %w", err) + } + + if entry.Fields == nil { + entry.Fields = make(map[string]any) + } + entry.Fields["_internal_id"] = id + + if fieldsJSON != "" && fieldsJSON != "null" { + json.Unmarshal([]byte(fieldsJSON), &entry.Fields) + } + if serviceInfoJSON != "" && serviceInfoJSON != "null" { + json.Unmarshal([]byte(serviceInfoJSON), &entry.ServiceInformation) + } + if systemMetricsJSON != "" && systemMetricsJSON != "null" { + json.Unmarshal([]byte(systemMetricsJSON), &entry.SystemMetrics) + } + if toolInfoJSON != "" && toolInfoJSON != "null" { + json.Unmarshal([]byte(toolInfoJSON), &entry.ToolInformation) + } + + entries = append(entries, entry) + } + + return entries, rows.Err() +} + +func (s *SQLiteStorage) Close() error { + close(s.rotationStop) + s.rotationWg.Wait() + + s.mu.Lock() + defer s.mu.Unlock() + return s.db.Close() } - -func (s *StorageService) SaveLogEntry(ctx context.Context, entry *models.LogMessage) error { - fieldsJSON := "" - if entry.Fields != nil { - b, err := json.Marshal(entry.Fields) - if err != nil { - return err - } - fieldsJSON = string(b) - } - - serviceInfoJSON := "" - if entry.ServiceInformation != nil { - b, err := json.Marshal(entry.ServiceInformation) - if err != nil { - return err - } - serviceInfoJSON = string(b) - } - - systemMetricsJSON := "" - if entry.SystemMetrics != nil { - b, err := json.Marshal(entry.SystemMetrics) - if err != nil { - return err - } - systemMetricsJSON = string(b) - } - - toolInfoJSON := "" - if entry.ToolInformation != nil { - b, err := json.Marshal(entry.ToolInformation) - if err != nil { - return err - } - toolInfoJSON = string(b) - } - - stmt := ` - INSERT INTO log_entries - (service, timestamp, type, host, tool, log_level, log_message, raw, priority, priority_name, unit, pid, boot_id, machine_id, fields, service_information, system_metrics, tool_information) - VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) - ` - _, err := s.db.ExecContext(ctx, stmt, - entry.Service, - entry.Timestamp, - entry.Type, - entry.Host, - entry.Tool, - entry.LogLevel, - entry.LogMessage, - entry.Raw, - entry.Priority, - entry.PriorityName, - entry.Unit, - entry.PID, - entry.BootID, - entry.MachineID, - fieldsJSON, - serviceInfoJSON, - systemMetricsJSON, - toolInfoJSON, - ) - return err -} - -func (s *StorageService) LoadLogEntry(ctx context.Context, id int64) (*models.LogMessage, error) { - row := s.db.QueryRowContext(ctx, "SELECT service, timestamp, type, host, tool, log_level, log_message, raw, priority, priority_name, unit, pid, boot_id, machine_id, fields, service_information, system_metrics, tool_information FROM log_entries WHERE id = ?", id) - - var entry models.LogMessage - var fieldsJSON, serviceInfoJSON, systemMetricsJSON, toolInfoJSON string - - err := row.Scan( - &entry.Service, - &entry.Timestamp, - &entry.Type, - &entry.Host, - &entry.Tool, - &entry.LogLevel, - &entry.LogMessage, - &entry.Raw, - &entry.Priority, - &entry.PriorityName, - &entry.Unit, - &entry.PID, - &entry.BootID, - &entry.MachineID, - &fieldsJSON, - &serviceInfoJSON, - &systemMetricsJSON, - &toolInfoJSON, - ) - if err != nil { - return nil, err - } - - if fieldsJSON != "" { - var fields map[string]any - if err = json.Unmarshal([]byte(fieldsJSON), &fields); err == nil { - entry.Fields = fields - } - } - - if serviceInfoJSON != "" { - var si any - if err = json.Unmarshal([]byte(serviceInfoJSON), &si); err == nil { - entry.ServiceInformation = si - } - } - - if systemMetricsJSON != "" { - var sm any - if err = json.Unmarshal([]byte(systemMetricsJSON), &sm); err == nil { - entry.SystemMetrics = sm - } - } - - if toolInfoJSON != "" { - var ti any - if err = json.Unmarshal([]byte(toolInfoJSON), &ti); err == nil { - entry.ToolInformation = ti - } - } - - return &entry, nil -} diff --git a/local_storageV2.go b/local_storageV2.go deleted file mode 100644 index bf2f2d3..0000000 --- a/local_storageV2.go +++ /dev/null @@ -1,356 +0,0 @@ -package main - -import ( - "context" - "database/sql" - "encoding/json" - "fmt" - "strings" - "tixel_watch/models" - - _ "modernc.org/sqlite" -) - -type SQLiteStorage struct { - db *sql.DB -} - -func NewSQLiteStorage(dbPath string) (*SQLiteStorage, error) { - db, err := sql.Open("sqlite", dbPath) - if err != nil { - return nil, fmt.Errorf("failed to open SQLite database: %w", err) - } - - if _, err := db.Exec("PRAGMA journal_mode=WAL"); err != nil { - return nil, fmt.Errorf("failed to enable WAL mode: %w", err) - } - - if err := createTables(db); err != nil { - return nil, fmt.Errorf("failed to create tables: %w", err) - } - - return &SQLiteStorage{db: db}, nil -} - -func createTables(db *sql.DB) error { - createTableStmt := ` - CREATE TABLE IF NOT EXISTS log_entries ( - id INTEGER PRIMARY KEY AUTOINCREMENT, - service TEXT, - timestamp DATETIME NOT NULL, - type TEXT NOT NULL, - host TEXT NOT NULL, - tool TEXT, - log_level TEXT, - log_message TEXT, - raw TEXT, - priority TEXT, - priority_name TEXT, - unit TEXT, - pid INTEGER, - boot_id TEXT, - machine_id TEXT, - fields TEXT, - service_information TEXT, - system_metrics TEXT, - tool_information TEXT, - created_at DATETIME DEFAULT CURRENT_TIMESTAMP, - exported_at DATETIME - );` - - if _, err := db.Exec(createTableStmt); err != nil { - return err - } - - indexes := []string{ - "CREATE INDEX IF NOT EXISTS idx_timestamp ON log_entries(timestamp);", - "CREATE INDEX IF NOT EXISTS idx_service ON log_entries(service);", - "CREATE INDEX IF NOT EXISTS idx_type ON log_entries(type);", - "CREATE INDEX IF NOT EXISTS idx_tool ON log_entries(tool);", - "CREATE INDEX IF NOT EXISTS idx_log_level ON log_entries(log_level);", - "CREATE INDEX IF NOT EXISTS idx_exported ON log_entries(exported_at);", - "CREATE INDEX IF NOT EXISTS idx_composite ON log_entries(timestamp, type, service);", - } - - for _, index := range indexes { - if _, err := db.Exec(index); err != nil { - return fmt.Errorf("failed to create index: %w", err) - } - } - - return nil -} - -func (s *SQLiteStorage) Store(ctx context.Context, entry *models.LogMessage) error { - return s.StoreBatch(ctx, []models.LogMessage{*entry}) -} - -func (s *SQLiteStorage) StoreBatch(ctx context.Context, entries []models.LogMessage) error { - if len(entries) == 0 { - return nil - } - - tx, err := s.db.BeginTx(ctx, nil) - if err != nil { - return fmt.Errorf("failed to begin transaction: %w", err) - } - defer tx.Rollback() - - stmt, err := tx.PrepareContext(ctx, ` - INSERT INTO log_entries - (service, timestamp, type, host, tool, log_level, log_message, raw, priority, priority_name, - unit, pid, boot_id, machine_id, fields, service_information, system_metrics, tool_information) - VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)`) - if err != nil { - return fmt.Errorf("failed to prepare statement: %w", err) - } - defer stmt.Close() - - for _, entry := range entries { - fieldsJSON, _ := json.Marshal(entry.Fields) - serviceInfoJSON, _ := json.Marshal(entry.ServiceInformation) - systemMetricsJSON, _ := json.Marshal(entry.SystemMetrics) - toolInfoJSON, _ := json.Marshal(entry.ToolInformation) - - _, err := stmt.ExecContext(ctx, - entry.Service, - entry.Timestamp, - entry.Type, - entry.Host, - entry.Tool, - entry.LogLevel, - entry.LogMessage, - entry.Raw, - entry.Priority, - entry.PriorityName, - entry.Unit, - entry.PID, - entry.BootID, - entry.MachineID, - string(fieldsJSON), - string(serviceInfoJSON), - string(systemMetricsJSON), - string(toolInfoJSON), - ) - if err != nil { - return fmt.Errorf("failed to insert entry: %w", err) - } - } - - return tx.Commit() -} - -func (s *SQLiteStorage) Query(ctx context.Context, query StorageQuery) ([]models.LogMessage, error) { - sqlQuery := "SELECT service, timestamp, type, host, tool, log_level, log_message, raw, priority, priority_name, unit, pid, boot_id, machine_id, fields, service_information, system_metrics, tool_information FROM log_entries WHERE 1=1" - args := []any{} - argCount := 0 - - if !query.StartTime.IsZero() { - argCount++ - sqlQuery += fmt.Sprintf(" AND timestamp >= ?%d", argCount) - args = append(args, query.StartTime) - } - if !query.EndTime.IsZero() { - argCount++ - sqlQuery += fmt.Sprintf(" AND timestamp <= ?%d", argCount) - args = append(args, query.EndTime) - } - if query.Service != "" { - argCount++ - sqlQuery += fmt.Sprintf(" AND service = ?%d", argCount) - args = append(args, query.Service) - } - if query.Tool != "" { - argCount++ - sqlQuery += fmt.Sprintf(" AND tool = ?%d", argCount) - args = append(args, query.Tool) - } - if query.LogLevel != "" { - argCount++ - sqlQuery += fmt.Sprintf(" AND log_level = ?%d", argCount) - args = append(args, query.LogLevel) - } - if query.Type != "" { - argCount++ - sqlQuery += fmt.Sprintf(" AND type = ?%d", argCount) - args = append(args, query.Type) - } - - if query.OrderBy != "" { - direction := "ASC" - if query.OrderDesc { - direction = "DESC" - } - sqlQuery += fmt.Sprintf(" ORDER BY %s %s", query.OrderBy, direction) - } else { - sqlQuery += " ORDER BY timestamp DESC" - } - - if query.Limit > 0 { - sqlQuery += fmt.Sprintf(" LIMIT %d", query.Limit) - if query.Offset > 0 { - sqlQuery += fmt.Sprintf(" OFFSET %d", query.Offset) - } - } - - rows, err := s.db.QueryContext(ctx, sqlQuery, args...) - if err != nil { - return nil, fmt.Errorf("failed to execute query: %w", err) - } - defer rows.Close() - - var entries []models.LogMessage - for rows.Next() { - var entry models.LogMessage - var fieldsJSON, serviceInfoJSON, systemMetricsJSON, toolInfoJSON string - - err := rows.Scan( - &entry.Service, - &entry.Timestamp, - &entry.Type, - &entry.Host, - &entry.Tool, - &entry.LogLevel, - &entry.LogMessage, - &entry.Raw, - &entry.Priority, - &entry.PriorityName, - &entry.Unit, - &entry.PID, - &entry.BootID, - &entry.MachineID, - &fieldsJSON, - &serviceInfoJSON, - &systemMetricsJSON, - &toolInfoJSON, - ) - if err != nil { - return nil, fmt.Errorf("failed to scan row: %w", err) - } - - if fieldsJSON != "" && fieldsJSON != "null" { - json.Unmarshal([]byte(fieldsJSON), &entry.Fields) - } - if serviceInfoJSON != "" && serviceInfoJSON != "null" { - json.Unmarshal([]byte(serviceInfoJSON), &entry.ServiceInformation) - } - if systemMetricsJSON != "" && systemMetricsJSON != "null" { - json.Unmarshal([]byte(systemMetricsJSON), &entry.SystemMetrics) - } - if toolInfoJSON != "" && toolInfoJSON != "null" { - json.Unmarshal([]byte(toolInfoJSON), &entry.ToolInformation) - } - - entries = append(entries, entry) - } - - return entries, rows.Err() -} - -func (s *SQLiteStorage) MarkAsExported(ctx context.Context, ids []int64) error { - if len(ids) == 0 { - return nil - } - - tx, err := s.db.BeginTx(ctx, nil) - if err != nil { - return err - } - - placeholders := strings.Repeat("?,", len(ids)) - placeholders = placeholders[:len(placeholders)-1] - - sqlQuery := fmt.Sprintf("UPDATE log_entries SET exported_at = CURRENT_TIMESTAMP WHERE id IN (%s)", placeholders) - - args := make([]any, len(ids)) - for i, id := range ids { - args[i] = id - } - - _, err = tx.ExecContext(ctx, sqlQuery, args...) - if err != nil { - tx.Rollback() - return err - } - - err = tx.Commit() - if err != nil { - return err - } - - return err -} - -func (s *SQLiteStorage) GetUnexportedEntries(ctx context.Context, limit int) ([]models.LogMessage, error) { - sqlQuery := `SELECT id, service, timestamp, type, host, tool, log_level, log_message, raw, priority, priority_name, - unit, pid, boot_id, machine_id, fields, service_information, system_metrics, tool_information - FROM log_entries WHERE exported_at IS NULL ORDER BY timestamp ASC` - - if limit > 0 { - sqlQuery += fmt.Sprintf(" LIMIT %d", limit) - } - - rows, err := s.db.QueryContext(ctx, sqlQuery) - if err != nil { - return nil, fmt.Errorf("failed to execute query: %w", err) - } - defer rows.Close() - - var entries []models.LogMessage - for rows.Next() { - var entry models.LogMessage - var id int64 - var fieldsJSON, serviceInfoJSON, systemMetricsJSON, toolInfoJSON string - - err := rows.Scan( - &id, - &entry.Service, - &entry.Timestamp, - &entry.Type, - &entry.Host, - &entry.Tool, - &entry.LogLevel, - &entry.LogMessage, - &entry.Raw, - &entry.Priority, - &entry.PriorityName, - &entry.Unit, - &entry.PID, - &entry.BootID, - &entry.MachineID, - &fieldsJSON, - &serviceInfoJSON, - &systemMetricsJSON, - &toolInfoJSON, - ) - if err != nil { - return nil, fmt.Errorf("failed to scan row: %w", err) - } - - if entry.Fields == nil { - entry.Fields = make(map[string]any) - } - entry.Fields["_internal_id"] = id - - if fieldsJSON != "" && fieldsJSON != "null" { - json.Unmarshal([]byte(fieldsJSON), &entry.Fields) - } - if serviceInfoJSON != "" && serviceInfoJSON != "null" { - json.Unmarshal([]byte(serviceInfoJSON), &entry.ServiceInformation) - } - if systemMetricsJSON != "" && systemMetricsJSON != "null" { - json.Unmarshal([]byte(systemMetricsJSON), &entry.SystemMetrics) - } - if toolInfoJSON != "" && toolInfoJSON != "null" { - json.Unmarshal([]byte(toolInfoJSON), &entry.ToolInformation) - } - - entries = append(entries, entry) - } - - return entries, rows.Err() -} - -func (s *SQLiteStorage) Close() error { - return s.db.Close() -} diff --git a/log_processor.go b/log_processor.go index ab0fd25..cb11b84 100644 --- a/log_processor.go +++ b/log_processor.go @@ -4,7 +4,7 @@ import ( "context" "log/slog" "time" - "tixel_watch/models" + "watch-tool/models" ) type LogProcessor struct { diff --git a/main.go b/main.go index 79e5dfe..6bf2735 100644 --- a/main.go +++ b/main.go @@ -1,6 +1,7 @@ package main import ( + "codeberg.org/pata1704/drain3" "context" "log/slog" "os" @@ -8,30 +9,60 @@ import ( "sync" "syscall" "time" - "tixel_watch/models" + "watch-tool/helpers" + "watch-tool/models" + "watch-tool/patterns" ) -var hostname string +var currentHostname string func init() { var err error - hostname, err = os.Hostname() + currentHostname, err = os.Hostname() if err != nil { - hostname = "unknown" + currentHostname = "unknown" + slog.Warn("Could not determine hostname, using fallback", "fallback", currentHostname) } } func main() { - cfg, err := LoadConfigV2() + cfg, err := LoadConfig() if err != nil { - slog.Error("error loading configuration", "error", err) + slog.Error("Startup failed: configuration error", "error", err) os.Exit(1) } - slog.Info("TIXEL System Monitor started") + + slog.Info("System Monitor started", "hostname", currentHostname) + + if err := patterns.GetInstance().Load(cfg.PatternsFile); err != nil { + slog.Error("Startup failed: could not load patterns", "file", cfg.PatternsFile, "error", err) + os.Exit(1) + } + slog.Info("Regex patterns loaded successfully", "file", cfg.PatternsFile) + + var d3Cfg *drain3.Config + if cfg.Drain3.Enabled { + d3Cfg = &drain3.Config{ + Depth: cfg.Drain3.Depth, + SimTh: cfg.Drain3.SimThreshold, + MaxChildren: cfg.Drain3.MaxChildren, + } + slog.Info("Drain3 anomaly detection enabled", "state_dir", cfg.Drain3.StateDir) + } else { + slog.Info("Drain3 anomaly detection disabled") + } var storage StorageInterface if cfg.LocalStorage.Enable { - sqliteStorage, err := NewSQLiteStorage(cfg.LocalStorage.DBPath) + rotationConfig := StorageRotationConfig{ + MaxSizeBytes: cfg.LocalStorage.RotationConfig.MaxSizeBytes, + MaxAgeHours: cfg.LocalStorage.RotationConfig.GetMaxAge(), + MaxFiles: cfg.LocalStorage.RotationConfig.MaxFiles, + CheckIntervalMinutes: cfg.LocalStorage.RotationConfig.GetCheckInterval(), + ArchiveDir: cfg.LocalStorage.RotationConfig.ArchiveDir, + } + + sqliteStorage, err := NewSQLiteStorageWithRotation(cfg.LocalStorage.DBPath, rotationConfig) if err != nil { slog.Error("failed to initialize SQLite storage", "error", err) os.Exit(1) @@ -57,7 +88,7 @@ func main() { exportManager = NewExportManager(storage, exportConfig) if cfg.Elasticsearch.Enabled { - esExporter, err := NewElasticsearchExporterV2(cfg.Elasticsearch) + esExporter, err := NewElasticsearchExporter(cfg.Elasticsearch) if err != nil { slog.Error("failed to create Elasticsearch exporter", "error", err) os.Exit(1) @@ -71,10 +102,6 @@ func main() { exportManager.RegisterExporter("elasticsearch", esExporter) slog.Info("Elasticsearch exporter registered") } - - // Add more exporters here in the future - // exportManager.RegisterExporter("checkmk", checkmkExporter) - // exportManager.RegisterExporter("grafana", grafanaExporter) } logChan := make(chan models.LogMessage, 1000) @@ -84,86 +111,92 @@ func main() { var wg sync.WaitGroup wg.Add(1) - go func() { + helpers.SafeGo(ctx, "LogProcessor", func() { defer wg.Done() processor := NewLogProcessor(storage) processor.Start(ctx, logChan) - }() + }) if exportManager != nil { wg.Add(1) - go func() { + helpers.SafeGo(ctx, "ExportManager", func() { defer wg.Done() exportManager.Start(ctx) - }() + }) } for _, service := range cfg.Services { if !service.Enabled { - slog.Info("Service deactivated, skipping...", "service", service.Name) + slog.Debug("Service deactivated, skipping...", "service", service.Name) continue } wg.Add(1) - go func(s ServiceConfig) { - defer wg.Done() - monitor := NewServiceMonitor(s) - if err := monitor.Start(ctx, logChan); err != nil { - slog.Error("error watching service", "service", s.Name, "error", err) - } - }(service) + srv := service - slog.Info("started watching Service-Log", "service", service.Name) + helpers.SafeGo(ctx, "ServiceMonitor-"+srv.Name, func() { + defer wg.Done() + monitor := NewServiceMonitor(srv, currentHostname, d3Cfg, cfg.Drain3.StateDir) + + if err := monitor.Start(ctx, logChan); err != nil { + slog.Error("Error watching service", "service", srv.Name, "error", err) + } + }) + + slog.Info("Started watching Service-Log", "service", service.Name) } for _, tool := range cfg.Tools { if !tool.Enabled { - slog.Info("Tool is deactivated, skipping...", "tool", tool.Name) + slog.Debug("Tool is deactivated, skipping...", "tool", tool.Name) continue } wg.Add(1) - go func(t ToolConfig) { - defer wg.Done() - monitor := NewFileMonitor(t) - if err := monitor.Start(ctx, logChan); err != nil { - slog.Error("error watching", "tool", t.Name, "error", err) - } - }(tool) + t := tool - slog.Info("started watching logs", "tool", tool.Name, "file", tool.LogFile) + helpers.SafeGo(ctx, "FileMonitor-"+t.Name, func() { + defer wg.Done() + + monitor := NewFileMonitor(t, currentHostname, d3Cfg, cfg.Drain3.StateDir) + + if err := monitor.Start(ctx, logChan); err != nil { + slog.Error("Error watching tool", "tool", t.Name, "error", err) + } + }) + + slog.Info("Started watching logs", "tool", tool.Name, "file", tool.LogFile) } if cfg.SystemMetrics.Enabled { wg.Add(1) - go func() { + helpers.SafeGo(ctx, "SystemMetrics", func() { defer wg.Done() - collector := NewSystemMetricsCollector(cfg.SystemMetrics, cfg.PollIntervalSeconds) - collector.StartV2(ctx, storage, logChan) - }() + collector := NewSystemMetricsCollector(cfg.SystemMetrics, cfg.PollIntervalSeconds, currentHostname) + collector.Start(ctx, storage, logChan) + }) slog.Info("Started collecting System-Metrics") } if cfg.WebService.Enabled { wg.Add(1) - go func() { + helpers.SafeGo(ctx, "WebService", func() { defer wg.Done() - webService := NewWebServiceV2(cfg, storage) + webService := NewWebService(cfg, storage) if err := webService.Start(ctx); err != nil { - slog.Error("web service error", "error", err) + slog.Error("Web service error", "error", err) } - }() + }) slog.Info("Web service started", "host", cfg.WebService.Host, "port", cfg.WebService.Port) } sigCh := make(chan os.Signal, 1) signal.Notify(sigCh, os.Interrupt, syscall.SIGTERM) - <-sigCh - slog.Info("Shutdown-Signal received, stopping threads...") + s := <-sigCh + slog.Info("Shutdown signal received, stopping threads...", "signal", s) cancel() - close(logChan) done := make(chan struct{}) go func() { @@ -173,9 +206,11 @@ func main() { select { case <-done: - slog.Info("All threads closed") + close(logChan) + slog.Info("All threads closed gracefully") case <-time.After(10 * time.Second): - slog.Info("Shutdown-Timeout reached, force quitting") + slog.Error("Shutdown timeout reached, force quitting") + os.Exit(1) } slog.Info("Program stopped") diff --git a/models/models.go b/models/models.go index 5c9cfee..8e8990e 100644 --- a/models/models.go +++ b/models/models.go @@ -136,18 +136,8 @@ type LogMessage struct { BootID string `json:"boot_id,omitempty"` MachineID string `json:"machine_id,omitempty"` Fields map[string]any `json:"fields,omitempty"` - // SyslogInfo SyslogFields `json:"syslog_information,omitempty"` } -// type LogMessage struct { -// Service string `json:"service"` -// Timestamp time.Time `json:"timestamp"` -// LogLevel string `json:"log_level"` -// LogMessage string `json:"log_message"` -// SyslogInfo SyslogFields `json:"syslog_information"` -// ServiceInformation any `json:"service_info,omitempty"` -// } - type SyslogFields struct { SysLogTimestamp time.Time `json:"syslog_timestamp"` Hostname string `json:"hostname"` diff --git a/parser/am_parser.go b/parser/am_parser.go deleted file mode 100644 index cfd9950..0000000 --- a/parser/am_parser.go +++ /dev/null @@ -1,49 +0,0 @@ -package parser - -import ( - "log/slog" - "regexp" - "strings" - "time" - "tixel_watch/helpers" - "tixel_watch/models" -) - -var ( - amServicePattern = regexp.MustCompile(`^(\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}(?:\.\d+)?Z)\s+(\w+)\s+(\d+)\s+---\s+\[\s*([^\]]*)\]\s+([\w\.]+)\s*:\s*(.*)$`) -) - -type AMParser struct{} - -func (a *AMParser) Parse(line string) (models.LogMessage, error) { - newEntry := models.LogMessage{ - Service: "access-manager", - } - syslogFields, logContent := helpers.ExtractSyslogHeader(line) - newEntry.Host = syslogFields.Hostname - - matches := amServicePattern.FindStringSubmatch(strings.TrimSpace(logContent)) - if len(matches) != 7 { - newEntry.Timestamp = time.Now() - newEntry.LogMessage = line - return newEntry, nil - } - - timestampStr := strings.Join(strings.Split(matches[1], " "), "T") - timestamp, err := helpers.ParseRFC3339WithOptionalZ(timestampStr) - if err != nil { - slog.Error("unable to parse time", "error", err) - return newEntry, err - } - baseInfo := models.AMBaseInfo{ - ProcessID: matches[3], - ThreadID: strings.TrimSpace(matches[4]), - LoggerName: matches[5], - } - newEntry.Timestamp = timestamp - newEntry.LogLevel = matches[2] - newEntry.LogMessage = matches[6] - newEntry.ServiceInformation = baseInfo - - return newEntry, nil -} diff --git a/parser/default_parser.go b/parser/default_parser.go deleted file mode 100644 index 664fc52..0000000 --- a/parser/default_parser.go +++ /dev/null @@ -1,28 +0,0 @@ -package parser - -import ( - "strings" - "time" - "tixel_watch/models" -) - -type DefaultParser struct { - Service string - Tool string -} - -func (d *DefaultParser) Parse(line string) (models.LogMessage, error) { - msg := models.LogMessage{ - LogLevel: "unknown", - LogMessage: strings.TrimSpace(line), - Raw: line, - Timestamp: time.Now(), - } - if d.Service != "" { - msg.Service = d.Service - } - if d.Tool != "" { - msg.Tool = d.Tool - } - return msg, nil -} diff --git a/parser/factory.go b/parser/factory.go index fb6bc15..ba4afd0 100644 --- a/parser/factory.go +++ b/parser/factory.go @@ -1,27 +1,20 @@ package parser -func New(serviceName, logType string) (Parser, error) { - switch logType { - case "custom": - switch serviceName { - case "tixstream": - return &TSParser{}, nil - case "transfer-job-manager": - return &TJMParser{}, nil - case "access-manager": - return &arser{}, nil - case "tixel-control-center": - return &TCCParser{}, nil - case "nginx": - return &NginxParser{}, nil - case "nginx-tjm": - return &NginxTJMLogParser{ToolName: serviceName}, nil - default: - return &DefaultParser{Service: serviceName}, nil - } +import "codeberg.org/pata1704/drain3" + +type ParserConfig struct { + ServiceName string + LogType string + Hostname string + DrainConfig *drain3.Config + StateDir string +} + +func New(cfg ParserConfig) (Parser, error) { + switch cfg.LogType { case "json": return &JSONParser{}, nil default: - return &DefaultParser{Service: serviceName}, nil + return NewGenericParser(cfg.ServiceName, cfg.Hostname, cfg.DrainConfig, cfg.StateDir), nil } } diff --git a/parser/generic_parser.go b/parser/generic_parser.go new file mode 100644 index 0000000..dd666c3 --- /dev/null +++ b/parser/generic_parser.go @@ -0,0 +1,231 @@ +package parser + +import ( + "fmt" + "log/slog" + "os" + "path/filepath" + "strconv" + "strings" + "time" + "watch-tool/models" + "watch-tool/patterns" + + "codeberg.org/pata1704/drain3" +) + +type GenericParser struct { + ServiceName string + Hostname string + Extractors []patterns.CompiledExtractor + CommonExt []patterns.CompiledExtractor + drainMiner *drain3.TemplateMiner + drainEnabled bool + drainStatePath string +} + +func NewGenericParser(serviceName, hostname string, drainCfg *drain3.Config, stateDir string) *GenericParser { + repo := patterns.GetInstance() + + var svcExt, commonExt []patterns.CompiledExtractor + if repo != nil { + svcExt = repo.GetExtractors(serviceName) + commonExt = repo.GetExtractors("common") + } else { + slog.Error("CRITICAL: Pattern Repository is nil. Parser will not work correctly.") + } + + parser := &GenericParser{ + ServiceName: serviceName, + Hostname: hostname, + Extractors: svcExt, + CommonExt: commonExt, + } + if drainCfg != nil && stateDir != "" { + parser.drainEnabled = true + + parser.drainStatePath = filepath.Join(stateDir, serviceName+".bin") + + if err := os.MkdirAll(stateDir, 0755); err != nil { + slog.Error("Failed to create drain3 state dir", "error", err) + parser.drainEnabled = false + return parser + } + + persister := drain3.NewFilePersistence(parser.drainStatePath, false) + + state, err := persister.LoadState() + if err == nil && state != nil { + parser.drainMiner = drain3.NewTemplateMiner(drainCfg, persister) + slog.Info("Drain3 state loaded", "service", serviceName, "clusters", len(state.Clusters)) + } else { + parser.drainMiner = drain3.NewTemplateMiner(drainCfg, persister) + slog.Info("Drain3 initialized fresh", "service", serviceName) + } + } + + return parser +} + +func (p *GenericParser) Parse(line string) (models.LogMessage, error) { + entry := models.LogMessage{ + Service: p.ServiceName, + Host: p.Hostname, + Timestamp: time.Now(), + Raw: line, + Fields: make(map[string]any), + Type: "log_entry", + } + + trimmedLine := strings.TrimSpace(line) + if trimmedLine == "" { + return entry, nil + } + + if p.drainEnabled && p.drainMiner != nil { + cluster := p.drainMiner.AddLogMessage(trimmedLine) + if cluster != nil { + entry.Fields["drain_template_id"] = cluster.ClusterID + entry.Fields["drain_template"] = cluster.TemplateMined + // Optional: Parameter extrahieren, die Drain gefunden hat (Wildcards) + } + } + + allExtractors := append(p.CommonExt, p.Extractors...) + + matchedAny := false + + for _, ext := range allExtractors { + matches := ext.Pattern.FindStringSubmatch(trimmedLine) + if matches == nil { + continue + } + matchedAny = true + + subexpNames := ext.Pattern.SubexpNames() + for i, matchValue := range matches { + if i == 0 { + continue + } + + groupName := subexpNames[i] + if groupName == "" { + continue + } + + cleanValue := strings.TrimSpace(matchValue) + + targetType := ext.Fields[groupName] + parsedValue := p.safeConvert(cleanValue, targetType) + + p.mapField(&entry, groupName, parsedValue) + } + } + + if !matchedAny { + entry.LogMessage = trimmedLine + entry.Fields["_parse_status"] = "failed" + } else if entry.LogMessage == "" { + entry.LogMessage = trimmedLine + } + + return entry, nil +} + +func (p *GenericParser) Close() error { + if p.drainEnabled && p.drainMiner != nil { + if err := p.drainMiner.SaveState("shutdown"); err != nil { + slog.Error("Failed to save drain3 state", "service", p.ServiceName, "error", err) + return err + } + slog.Debug("Drain3 state saved", "service", p.ServiceName) + } + return nil +} + +func (p *GenericParser) safeConvert(value, typeDef string) any { + if value == "" || value == "-" { + if strings.HasPrefix(typeDef, "int") || strings.HasPrefix(typeDef, "float") { + return 0 + } + return value + } + + var err error + var result any + + switch { + case strings.HasPrefix(typeDef, "int"): + var i int + i, err = strconv.Atoi(value) + result = i + + case strings.HasPrefix(typeDef, "float"): + var f float64 + f, err = strconv.ParseFloat(value, 64) + result = f + + case strings.HasPrefix(typeDef, "time:"): + layout := strings.TrimPrefix(typeDef, "time:") + result, err = p.parseTimeRobust(value, layout) + + case typeDef == "bool": + var b bool + b, err = strconv.ParseBool(value) + result = b + + default: + return value + } + + if err != nil { + return value + } + + return result +} + +func (p *GenericParser) parseTimeRobust(value, layout string) (time.Time, error) { + if layout == "Jan 02 15:04:05" { + t, err := time.Parse(layout, value) + if err != nil { + return time.Time{}, err + } + now := time.Now() + year := now.Year() + if t.Month() > now.Month() { + year-- + } + return t.AddDate(year, 0, 0), nil + } + + return time.Parse(layout, value) +} + +func (p *GenericParser) mapField(entry *models.LogMessage, key string, value any) { + switch key { + case "timestamp", "time": + if t, ok := value.(time.Time); ok { + entry.Timestamp = t + } + case "log_level", "level": + entry.LogLevel = fmt.Sprintf("%v", value) + case "message", "msg": + entry.LogMessage = fmt.Sprintf("%v", value) + case "host", "hostname": + entry.Host = fmt.Sprintf("%v", value) + case "service": + entry.Service = fmt.Sprintf("%v", value) + case "pid": + if v, ok := value.(int); ok { + entry.PID = v + } else if vStr, ok := value.(string); ok { + if pid, err := strconv.Atoi(vStr); err == nil { + entry.PID = pid + } + } + + default: + entry.Fields[key] = value + } +} diff --git a/parser/generic_parser_test.go b/parser/generic_parser_test.go new file mode 100644 index 0000000..13cfbac --- /dev/null +++ b/parser/generic_parser_test.go @@ -0,0 +1,198 @@ +package parser + +import ( + "log/slog" + "os" + "path/filepath" + "testing" + "watch-tool/patterns" + + "codeberg.org/pata1704/drain3" +) + +func setupPatterns(t *testing.T) { + content := ` +patterns: + common: + extractors: + - name: "syslog_header" + regex: '^\w{3} \d{2} \d{2}:\d{2}:\d{2} (?P\S+) .*' + fields: + hostname: "string" + + test_service: + extractors: + - name: "data_line" + regex: 'Data: id=(?P\d+) size=(?P[0-9.]+) active=(?Ptrue|false) empty=(?P\S+)' + fields: + id: "int" + size_mb: "float" + is_active: "bool" + empty_val: "int" # Testet Fallback bei "-" +` + tmpfile, err := os.CreateTemp("", "patterns_parser_test_*.yaml") + if err != nil { + t.Fatal(err) + } + defer os.Remove(tmpfile.Name()) + + if _, err := tmpfile.Write([]byte(content)); err != nil { + t.Fatal(err) + } + tmpfile.Close() + + if err := patterns.GetInstance().Load(tmpfile.Name()); err != nil { + t.Fatal(err) + } +} + +func TestGenericParser_Parse_Regex(t *testing.T) { + setupPatterns(t) + + p := NewGenericParser("test_service", "localhost", nil, "") + + line := "Sep 28 10:00:00 myhost Data: id=42 size=12.5 active=true empty=-" + entry, err := p.Parse(line) + if err != nil { + t.Fatalf("Parse failed: %v", err) + } + + if entry.Host != "myhost" { + t.Errorf("Expected host 'myhost', got '%s'", entry.Host) + } + + if val, ok := entry.Fields["id"].(int); !ok || val != 42 { + t.Errorf("Expected id=42 (int), got %v (%T)", entry.Fields["id"], entry.Fields["id"]) + } + + if val, ok := entry.Fields["size_mb"].(float64); !ok || val != 12.5 { + t.Errorf("Expected size_mb=12.5 (float), got %v", entry.Fields["size_mb"]) + } + + if val, ok := entry.Fields["is_active"].(bool); !ok || val != true { + t.Errorf("Expected is_active=true, got %v", entry.Fields["is_active"]) + } + + if val, ok := entry.Fields["empty_val"].(int); !ok || val != 0 { + t.Errorf("Expected empty_val=0 for '-', got %v", entry.Fields["empty_val"]) + } +} + +func TestGenericParser_Drain3_Integration(t *testing.T) { + setupPatterns(t) + + opts := &slog.HandlerOptions{Level: slog.LevelDebug} + logger := slog.New(slog.NewTextHandler(os.Stdout, opts)) + slog.SetDefault(logger) + + tmpDir, err := os.MkdirTemp("", "drain_state_test") + if err != nil { + t.Fatal(err) + } + defer os.RemoveAll(tmpDir) + + drainCfg := &drain3.Config{ + Depth: 4, + SimTh: 0.5, + MaxChildren: 100, + MaxClusters: 100, + } + + serviceName := "test_service" + p := NewGenericParser(serviceName, "localhost", drainCfg, tmpDir) + + log1 := "User admin logged in from 192.168.1.1" + log2 := "User guest logged in from 10.0.0.1" + + entry1, _ := p.Parse(log1) + if entry1.Fields["drain_template_id"] == nil { + t.Error("Drain3 did not assign a template ID for log1") + } + + entry2, _ := p.Parse(log2) + + id1 := entry1.Fields["drain_template_id"] + id2 := entry2.Fields["drain_template_id"] + t.Logf("IDs: %v -> %v", id1, id2) + t.Logf("Template 2: %s", entry2.Fields["drain_template"]) + + if err := p.Close(); err != nil { + t.Fatalf("Close failed: %v", err) + } + + expectedFile := filepath.Join(tmpDir, serviceName+".bin") + + if info, err := os.Stat(expectedFile); os.IsNotExist(err) { + t.Errorf("Drain3 state file NOT found at: %s", expectedFile) + + entries, _ := os.ReadDir(tmpDir) + t.Logf("Listing directory %s:", tmpDir) + for _, e := range entries { + t.Logf(" - Found file: %s", e.Name()) + } + } else { + t.Logf("Success: State file found (%d bytes)", info.Size()) + } +} + +func TestGenericParser_Robustness(t *testing.T) { + setupPatterns(t) + + p := NewGenericParser("test_service", "localhost", nil, "") + + tests := []struct { + name string + log string + checkField string + expectedValue any + shouldFail bool + }{ + { + name: "Empty Line", + log: "", + checkField: "", + expectedValue: nil, + shouldFail: false, + }, + { + name: "Type Mismatch Int (Text instead of Int)", + log: "Data: id=abc size=12.5 active=true empty=-", + checkField: "_parse_status", + expectedValue: "failed", + }, + { + name: "Value Missing (Dash) for Int", + log: "Data: id=1 size=1.0 active=true empty=-", + checkField: "empty_val", + expectedValue: 0, + }, + { + name: "Value Missing (Dash) for Float", + log: "Data: id=1 size=1.0 active=true empty=0", + checkField: "size_mb", + expectedValue: 1.0, + }, + } + + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + entry, err := p.Parse(tc.log) + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + + if tc.checkField != "" { + val, exists := entry.Fields[tc.checkField] + if tc.expectedValue == "failed" { + if !exists || val != "failed" { + t.Errorf("Expected parse failure status, got %v", val) + } + } else { + if val != tc.expectedValue { + t.Errorf("Field %s: expected %v, got %v", tc.checkField, tc.expectedValue, val) + } + } + } + }) + } +} diff --git a/parser/json_parser.go b/parser/json_parser.go index fd3527e..ce270bf 100644 --- a/parser/json_parser.go +++ b/parser/json_parser.go @@ -3,7 +3,7 @@ package parser import ( "encoding/json" "log/slog" - "tixel_watch/models" + "watch-tool/models" ) type JSONParser struct{} @@ -17,3 +17,7 @@ func (j *JSONParser) Parse(line string) (models.LogMessage, error) { } return logMsg, nil } + +func (p *JSONParser) Close() error { + return nil +} diff --git a/parser/nginx_parser.go b/parser/nginx_parser.go deleted file mode 100644 index ee4da40..0000000 --- a/parser/nginx_parser.go +++ /dev/null @@ -1,57 +0,0 @@ -package parser - -import ( - "log/slog" - "regexp" - "strconv" - "strings" - "tixel_watch/models" -) - -var ( - nginxAccessPattern = regexp.MustCompile(`^(\S+)\s+\S+\s+(\S+)\s+\[([^\]]+)\]\s+"([^"]+)"\s+(\d+)\s+(\d+|-)\s*(?:"([^"]*)"\s+"([^"]*)")?`) -) - -type NginxParser struct{} - -func (n *NginxParser) Parse(line string) (models.LogMessage, error) { - newEntry := models.LogMessage{ - Service: "nginx", - } - - matches := nginxAccessPattern.FindStringSubmatch(strings.TrimSpace(line)) - if len(matches) < 7 { - return newEntry, nil - } - statusCode, err := strconv.ParseInt(matches[5], 10, 64) - if err != nil { - slog.Error("cant parse statuscode", "error", err) - } - bytesSend, err := strconv.ParseInt(matches[6], 10, 64) - if err != nil { - slog.Error("cant parse bytessend", "error", err) - } - baseInfo := models.NGinXBaseInfo{ - ClientIP: matches[1], - RemoteUser: matches[2], - Request: matches[4], - StatusCode: int(statusCode), - BytesSend: int(bytesSend), - } - - if len(matches) > 7 && matches[7] != "" { - baseInfo.Referer = matches[7] - } - if len(matches) > 8 && matches[8] != "" { - baseInfo.UserAgent = matches[8] - } - - if requestParts := strings.Fields(matches[4]); len(requestParts) >= 3 { - baseInfo.HTTPMethod = requestParts[0] - baseInfo.RequestURI = requestParts[1] - baseInfo.HTTPVersion = requestParts[2] - } - newEntry.ServiceInformation = baseInfo - - return newEntry, nil -} diff --git a/parser/nginx_tjm_parser.go b/parser/nginx_tjm_parser.go index d5f4f5e..2662a49 100644 --- a/parser/nginx_tjm_parser.go +++ b/parser/nginx_tjm_parser.go @@ -4,12 +4,13 @@ import ( "log/slog" "strconv" "strings" - "tixel_watch/helpers" - "tixel_watch/models" + "watch-tool/helpers" + "watch-tool/models" ) type NginxTJMLogParser struct { ToolName string + Hostname string } func (p *NginxTJMLogParser) Parse(line string) (models.LogMessage, error) { @@ -18,11 +19,7 @@ func (p *NginxTJMLogParser) Parse(line string) (models.LogMessage, error) { Tool: p.ToolName, Raw: line, } - hostname, err := helpers.GetHostname() - if err != nil { - return entry, err - } - entry.Host = hostname + entry.Host = p.Hostname entry = p.parseNginxTJM(entry) return entry, nil } diff --git a/parser/parser.go b/parser/parser.go index 480c741..52c01ad 100644 --- a/parser/parser.go +++ b/parser/parser.go @@ -1,11 +1,10 @@ package parser import ( - "tixel_watch/models" + "watch-tool/models" ) type Parser interface { - //TODO: Change parsers to return an error as well Parse(line string) (models.LogMessage, error) - // Parse(line string) models.LogMessage + Close() error } diff --git a/parser/regex_parser.go b/parser/regex_parser.go deleted file mode 100644 index d12a643..0000000 --- a/parser/regex_parser.go +++ /dev/null @@ -1,64 +0,0 @@ -package parser - -import ( - "log/slog" - "regexp" - "strings" - "tixel_watch/helpers" - "tixel_watch/models" -) - -type RegexLogParser struct { - Pattern *regexp.Regexp - Fields map[string]string - Toolname string -} - -func (p *RegexLogParser) Parse(line string) (models.LogMessage, error) { - entry := models.LogMessage{Type: "log_entry"} - entry.Tool = p.Toolname - entry.Raw = line - hostname, err := helpers.GetHostname() - if err != nil { - slog.Warn("cannot get hostname") - return entry, err - } - entry.Host = hostname - - fields := p.parseWithPattern(line) - if fields != nil { - entry.Fields = fields - } else { - entry.LogMessage = strings.TrimSpace(line) - } - - return entry, nil -} - -func (p *RegexLogParser) parseWithPattern(text string) map[string]any { - matches := p.Pattern.FindStringSubmatch(text) - if matches == nil { - return nil - } - - fields := make(map[string]any) - subexpNames := p.Pattern.SubexpNames() - - for i, match := range matches { - if i == 0 { - continue - } - - if i < len(subexpNames) && subexpNames[i] != "" { - fieldName := subexpNames[i] - - if mappedName, exists := p.Fields[fieldName]; exists { - fieldName = mappedName - } - - fields[fieldName] = match - } - } - - return fields -} diff --git a/parser/tcc_parser.go b/parser/tcc_parser.go deleted file mode 100644 index 2c117c9..0000000 --- a/parser/tcc_parser.go +++ /dev/null @@ -1,49 +0,0 @@ -package parser - -import ( - "log/slog" - "regexp" - "strings" - "time" - "tixel_watch/helpers" - "tixel_watch/models" -) - -var ( - tccServicePattern = regexp.MustCompile(`^(\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}(?:\.\d+)?Z)\s+(\w+)\s+(\d+)\s+---\s+\[\s*([^\]]*)\]\s+([\w\.]+)\s*:\s*(.*)$`) -) - -type TCCParser struct{} - -func (t *TCCParser) Parse(line string) (models.LogMessage, error) { - newEntry := models.LogMessage{ - Service: "tixel-control-center", - } - syslogFields, logContent := helpers.ExtractSyslogHeader(line) - newEntry.Host = syslogFields.Hostname - - matches := tccServicePattern.FindStringSubmatch(strings.TrimSpace(logContent)) - if len(matches) != 7 { - newEntry.Timestamp = time.Now() - newEntry.LogMessage = line - return newEntry, nil - } - - timestampStr := strings.Join(strings.Split(matches[1], " "), "T") - timestamp, err := helpers.ParseRFC3339WithOptionalZ(timestampStr) - if err != nil { - slog.Error("unable to parse time", "error", err) - return newEntry, err - } - baseInfo := models.TCCBaseInfo{ - ProcessID: matches[3], - ThreadID: strings.TrimSpace(matches[4]), - LoggerName: matches[5], - } - newEntry.Timestamp = timestamp - newEntry.LogLevel = matches[2] - newEntry.LogMessage = matches[6] - newEntry.ServiceInformation = baseInfo - - return newEntry, nil -} diff --git a/parser/tjm_parser.go b/parser/tjm_parser.go deleted file mode 100644 index 850e21c..0000000 --- a/parser/tjm_parser.go +++ /dev/null @@ -1,91 +0,0 @@ -package parser - -import ( - "log/slog" - "regexp" - "strings" - "tixel_watch/helpers" - "tixel_watch/models" -) - -var ( - tjmServicePattern = regexp.MustCompile(`^(?