diff --git a/config.go b/config.go index 93f0a88..6527410 100644 --- a/config.go +++ b/config.go @@ -2,8 +2,8 @@ package main import ( "fmt" - "log" "log/slog" + "regexp" "time" "github.com/spf13/viper" @@ -96,6 +96,7 @@ type Config struct { Level string `mapstructure:"level"` FilePath string `mapstructure:"file_path"` } `mapstructure:"logging"` + PatternsFile string `mapstructure:"patterns_file"` } type StorageRotationConfig struct { @@ -125,30 +126,6 @@ func (src StorageRotationConfig) GetCheckInterval() time.Duration { return time.Duration(src.CheckIntervalMinutes) * time.Minute } -func LoadConfig() (*Config, error) { - viper.SetConfigName("config") - viper.AddConfigPath(".") - viper.AddConfigPath("/opt/tixel/tixel-watch/") - viper.SetConfigType("yaml") - - setConfigDefaults() - - if err := viper.ReadInConfig(); err != nil { - return nil, fmt.Errorf("error reading config: %w", err) - } - - var cfg Config - if err := viper.Unmarshal(&cfg); err != nil { - return nil, fmt.Errorf("error parsing config: %w", err) - } - - if err := validateConfig(&cfg); err != nil { - return nil, fmt.Errorf("config validation failed: %w", err) - } - - return &cfg, nil -} - func setConfigDefaults() { viper.SetDefault("poll_interval_seconds", 30) viper.SetDefault("elasticsearch.timeout", 30) @@ -162,21 +139,6 @@ func setConfigDefaults() { viper.SetDefault("web_service.port", 8080) viper.SetDefault("web_service.host", "localhost") viper.SetDefault("logging.level", "info") -} - -func setConfigDefaultsV2() { - viper.SetDefault("poll_interval_seconds", 30) - viper.SetDefault("elasticsearch.timeout", 30) - viper.SetDefault("system_metrics.enabled", true) - viper.SetDefault("system_metrics.collect_cpu", true) - viper.SetDefault("system_metrics.collect_memory", true) - viper.SetDefault("system_metrics.collect_disk", true) - viper.SetDefault("system_metrics.collect_network", false) - viper.SetDefault("system_metrics.disk_paths", []string{"/"}) - viper.SetDefault("web_service.enabled", false) - viper.SetDefault("web_service.port", 8080) - viper.SetDefault("web_service.host", "localhost") - viper.SetDefault("logging.level", "info") viper.SetDefault("export.enabled", true) viper.SetDefault("export.batch_size", 100) viper.SetDefault("export.export_interval", "30s") @@ -190,38 +152,16 @@ func setConfigDefaultsV2() { viper.SetDefault("localstorage.rotation.max_files", 7) viper.SetDefault("localstorage.rotation.check_interval_minutes", 5) viper.SetDefault("localstorage.rotation.archive_dir", "") + viper.SetDefault("patterns_file", "./configs/patterns.yaml") } -func validateConfig(cfg *Config) error { - if cfg.Elasticsearch.URL == "" { - return fmt.Errorf("elasticsearch.url is required") - } - - if cfg.Elasticsearch.Index == "" { - return fmt.Errorf("elasticsearch.index is required") - } - - if cfg.PollIntervalSeconds <= 0 { - log.Printf("Warning: poll_interval_seconds is %d, setting to 30", cfg.PollIntervalSeconds) - cfg.PollIntervalSeconds = 30 - } - - for i := range cfg.Tools { - if cfg.Tools[i].BufferSize <= 0 { - cfg.Tools[i].BufferSize = 100 - } - } - - return nil -} - -func LoadConfigV2() (*Config, error) { +func LoadConfig() (*Config, error) { viper.SetConfigName("config") viper.AddConfigPath(".") viper.AddConfigPath("/opt/tixel/tixel-watch/") viper.SetConfigType("yaml") - setConfigDefaultsV2() + setConfigDefaults() if err := viper.ReadInConfig(); err != nil { return nil, fmt.Errorf("error reading config: %w", err) @@ -232,14 +172,14 @@ func LoadConfigV2() (*Config, error) { return nil, fmt.Errorf("error parsing config: %w", err) } - if err := validateConfigV2(&cfg); err != nil { + if err := validateConfig(&cfg); err != nil { return nil, fmt.Errorf("config validation failed: %w", err) } return &cfg, nil } -func validateConfigV2(cfg *Config) error { +func validateConfig(cfg *Config) error { if !cfg.LocalStorage.Enable { return fmt.Errorf("local storage must be enabled in the new architecture") } @@ -280,9 +220,15 @@ func validateConfigV2(cfg *Config) error { } } - for i := range cfg.Tools { - if cfg.Tools[i].BufferSize <= 0 { - cfg.Tools[i].BufferSize = 100 + for _, tool := range cfg.Tools { + if tool.BufferSize <= 0 { + tool.BufferSize = 100 + } + + if tool.Format.Pattern != "" { + if _, err := regexp.Compile(tool.Format.Pattern); err != nil { + return fmt.Errorf("invalid regex for tool '%s': %w", tool.Name, err) + } } } diff --git a/configs/patterns.yml b/configs/patterns.yml new file mode 100644 index 0000000..069a38c --- /dev/null +++ b/configs/patterns.yml @@ -0,0 +1,164 @@ +patterns: + # =========================================================================== + # Common / Shared Patterns + # =========================================================================== + common: + extractors: + - name: "syslog_header" + regex: '^(\w{3} \d{2} \d{2}:\d{2}:\d{2}) (?P[^\s]+) (?P[^:]+):\s*(?P.*)$' + fields: + syslog_timestamp: "time:Jan 02 15:04:05" + hostname: "string" + process_info: "string" + message_rest: "string" + + - name: "timestamp_rfc3339" + regex: '(?P\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}(?:\.\d+)?Z?)' + fields: + timestamp: "time:2006-01-02T15:04:05.000000Z" + + # =========================================================================== + # TIXstream Service + # Deckt ab: tsServicePattern, tsTransferIDPattern, tsDetailPattern1-4 + # =========================================================================== + tixstream: + extractors: + - name: "service_log_base" + regex: '^(?P\S+)\s+(?P\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}\.\d{6})\s+(?P.*)' + fields: + log_level: "string" + timestamp: "time:2006-01-02 15:04:05.000000" + message: "string" + + - name: "transfer_id_extraction" + regex: '^(?P\w{8}-\w{4}-\w{4}-\w{4}-\w{12})\s+(?P.*)' + fields: + transfer_id: "string" + message: "string" + + - name: "transfer_start_in" + regex: 'in: Transfer start (?P\d+/\d+) buffers=(?P\d+) files=(?P\d+) size=(?P[0-9.]+) MByte chunksize=(?P\d+) streams=(?P\d+) target-datarate=(?P[0-9.]+) MByte/s protocol=(?P\w+) dest=(?P\S+) sender-id=(?P\S+)' + fields: + thread_info: "string" # z.B. "1/4" - Typisierung hier schwierig, also String + buffers: "int" + file_count: "int" + size_mb: "float" + chunk_size: "int" + streams: "int" + target_rate: "float" + protocol: "string" + destination: "string" + sender_id: "string" + direction: "string" # Wir können statische Felder im Parser injecten oder hier als "implizit" betrachten + + - name: "transfer_start_remote_out" + regex: 'out: Start remote transfer to (?P[^\s]+) request executed, duration=(?P[0-9.]+) s' + fields: + target: "string" + duration: "float" + + - name: "transfer_start_out" + regex: 'out: Transfer start (?P\d+/\d+) buffers=(?P\d+) files=(?P\d+) size=(?P[0-9.]+) MByte chunksize=(?P\d+) streams=(?P\d+) target-datarate=(?P[0-9.]+) MByte/s protocol=(?P\w+) src=(?P\S+) receiver=(?P\S+)' + fields: + thread_info: "string" + buffers: "int" + file_count: "int" + size_mb: "float" + chunk_size: "int" + streams: "int" + target_rate: "float" + protocol: "string" + source: "string" + receiver: "string" + + - name: "transfer_start_generic" + regex: 'out: Start transfer (?P\d+/\d+), src=(?P[^ ]*) dest=(?P[^ ]*) item\[0\]=(?P[^ ]*) count=(?P\d+)' + fields: + thread_info: "string" + source: "string" + destination: "string" + item0: "string" + count: "int" + + # =========================================================================== + # Transfer Job Manager (TJM) + # Deckt ab: tjmServicePattern, tjmTransferNamePattern, tjmTransferIDPattern1/2 + # =========================================================================== + transfer-job-manager: + extractors: + - name: "service_log_base" + regex: '^(?P\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}\.\d{3})\s+(?P\S+)\s+(?P\d+).*?\[(?P[^\]]*)\]\s+\[(?P[^\]]*)\]\s+\[(?P[^\]]*)\]\s+(?P.*?)\s+:\s+(?P.*)' + fields: + timestamp: "time:2006-01-02 15:04:05.000" + log_level: "string" + pid: "int" + correlation_id: "string" + username: "string" + thread_id: "string" + java_class: "string" + message: "string" + + - name: "transfer_name_info" + regex: '^(?P\d{8}T\d{6}-[A-Za-z0-9]+-.+?-(?:in|out)) ?: (?P.*)$' + fields: + transfer_name_raw: "string" + message: "string" + + - name: "transfer_id_mid" + regex: '(?P\w{8}-\w{4}-\w{4}-\w{4}-\w{12}).*?(?P.*)' + fields: + transfer_id: "string" + message: "string" + + - name: "transfer_id_prefixed" + regex: '(?P.*)(?P\w{8}-\w{4}-\w{4}-\w{4}-\w{12}).*?(?P.*)' + fields: + prefix: "string" + transfer_id: "string" + message: "string" + + # =========================================================================== + # Access Manager & TCC + # Deckt ab: amServicePattern, tccServicePattern + # =========================================================================== + access-manager: + extractors: + - name: "spring_boot_log" + regex: '^(?P\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}(?:\.\d+)?Z)\s+(?P\w+)\s+(?P\d+)\s+---\s+\[\s*(?P[^\]]*)\]\s+(?P[\w\.]+)\s*:\s+(?P.*)$' + fields: + timestamp: "time:2006-01-02T15:04:05.000000Z" + log_level: "string" + pid: "int" + thread_id: "string" + logger: "string" + message: "string" + + tixel-control-center: + extractors: + - name: "spring_boot_log" + regex: '^(?P\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}(?:\.\d+)?Z)\s+(?P\w+)\s+(?P\d+)\s+---\s+\[\s*(?P[^\]]*)\]\s+(?P[\w\.]+)\s*:\s+(?P.*)$' + fields: + timestamp: "time:2006-01-02T15:04:05.000000Z" + log_level: "string" + pid: "int" + thread_id: "string" + logger: "string" + message: "string" + + # =========================================================================== + # Nginx + # Deckt ab: nginxAccessPattern + # =========================================================================== + nginx: + extractors: + - name: "access_log" + regex: '^(?P\S+)\s+\S+\s+(?P\S+)\s+\[(?P[^\]]+)\]\s+"(?P[^"]+)"\s+(?P\d+)\s+(?P\d+|-)\s*(?:"(?P[^"]*)"\s+"(?P[^"]*)")?' + fields: + client_ip: "string" + remote_user: "string" + timestamp_nginx: "string" + request: "string" + status_code: "int" + bytes_sent: "int" + referer: "string" + user_agent: "string" diff --git a/elasticsearch.go b/elasticsearch.go index 3b3537d..6b217c8 100644 --- a/elasticsearch.go +++ b/elasticsearch.go @@ -7,7 +7,7 @@ import ( "log/slog" "strings" "time" - "tixel_watch/models" + "watch-tool/models" "github.com/elastic/go-elasticsearch/v8" ) diff --git a/elasticsearch_exporter.go b/elasticsearch_exporter.go index 0a36f5f..55f4dbe 100644 --- a/elasticsearch_exporter.go +++ b/elasticsearch_exporter.go @@ -4,280 +4,87 @@ import ( "context" "encoding/json" "fmt" - "io" "log/slog" "strings" "time" + "watch-tool/models" "github.com/elastic/go-elasticsearch/v8" ) type ElasticsearchExporter struct { client *elasticsearch.Client + config ElasticsearchConfig } -func NewElasticsearchExporter(client *elasticsearch.Client) *ElasticsearchExporter { +func NewElasticsearchExporter(config ElasticsearchConfig) (*ElasticsearchExporter, error) { + client, err := NewElasticsearchClient(config) + if err != nil { + return nil, fmt.Errorf("failed to create Elasticsearch client: %w", err) + } + return &ElasticsearchExporter{ client: client, - } + config: config, + }, nil } -type ExportResult struct { - Index string `json:"index"` - DocumentCount int `json:"document_count"` - StartTime time.Time `json:"start_time"` - EndTime time.Time `json:"end_time"` - Duration string `json:"duration"` - Error string `json:"error,omitempty"` -} - -func (e *ElasticsearchExporter) ExportToStream(ctx context.Context, indices []string, batchSize int, since int, writer io.Writer) error { - startTime := time.Now() - - if _, err := writer.Write([]byte("{\n \"export_info\": {\n")); err != nil { - return fmt.Errorf("error writing export header: %w", err) +func (e *ElasticsearchExporter) Export(ctx context.Context, entries []models.LogMessage) error { + if len(entries) == 0 { + return nil } - exportInfo := map[string]any{ - "timestamp": startTime, - "indices": indices, - "batch_size": batchSize, - "sinceDays": since, - } + var body strings.Builder + for _, entry := range entries { + indexName := e.config.Index - infoBytes, err := json.MarshalIndent(exportInfo, " ", " ") - if err != nil { - return fmt.Errorf("error marshalling export info: %w", err) - } + indexLine := fmt.Sprintf(`{"index":{"_index":"%s"}}`, indexName) + body.WriteString(indexLine) + body.WriteString("\n") - infoStr := string(infoBytes) - infoStr = strings.TrimPrefix(infoStr, "{") - infoStr = strings.TrimSuffix(infoStr, "}") - - if _, err := writer.Write([]byte(infoStr)); err != nil { - return fmt.Errorf("error writing export info: %w", err) - } - - if _, err := writer.Write([]byte("\n },\n \"data\": {\n")); err != nil { - return fmt.Errorf("error writing data header: %w", err) - } - - results := make([]ExportResult, 0, len(indices)) - first := true - - for _, index := range indices { - if !first { - if _, err := writer.Write([]byte(",\n")); err != nil { - return fmt.Errorf("error writing separator: %w", err) - } - } - first = false - - result := e.exportIndex(ctx, index, batchSize, since, writer) - results = append(results, result) - - if result.Error != "" { - slog.Error("error exporting index", "index", index, "error", result.Error) + data, err := json.Marshal(entry) + if err != nil { + slog.Error("error marshalling JSON", "error", err) + continue } + body.WriteString(string(data)) + body.WriteString("\n") } - if _, err := writer.Write([]byte("\n },\n \"results\": ")); err != nil { - return fmt.Errorf("error writing results header: %w", err) - } + timeout := time.Duration(e.config.Timeout) * time.Second + ctx, cancel := context.WithTimeout(ctx, timeout) + defer cancel() - if err := json.NewEncoder(writer).Encode(results); err != nil { - return fmt.Errorf("error writing results: %w", err) - } - - if _, err := writer.Write([]byte("}\n")); err != nil { - return fmt.Errorf("error writing final bracket: %w", err) - } - - duration := time.Since(startTime) - slog.Info("Export completed", "duration", duration, "indices_count", len(indices)) - - return nil -} - -func (e *ElasticsearchExporter) exportIndex(ctx context.Context, index string, batchSize int, since int, writer io.Writer) ExportResult { - startTime := time.Now() - result := ExportResult{ - Index: index, - StartTime: startTime, - } - - if _, err := fmt.Fprintf(writer, " \"%s\": [\n", index); err != nil { - result.Error = fmt.Sprintf("error writing index header: %v", err) - result.EndTime = time.Now() - result.Duration = time.Since(startTime).String() - return result - } - - query := `{"query":{"match_all":{}}}` - if since > 0 { - query = fmt.Sprintf(`{ - "query": { - "range": { - "timestamp": { - "gte": "now-%dd/d", - "lt": "now/d" - } - } - } -}`, since) - } - res, err := e.client.Search( - e.client.Search.WithContext(ctx), - e.client.Search.WithIndex(index), - e.client.Search.WithScroll(1000), - e.client.Search.WithSize(batchSize), - e.client.Search.WithBody(strings.NewReader(query)), + res, err := e.client.Bulk( + strings.NewReader(body.String()), + e.client.Bulk.WithContext(ctx), ) if err != nil { - result.Error = fmt.Sprintf("error in initial search: %v", err) - result.EndTime = time.Now() - result.Duration = time.Since(startTime).String() - return result + return fmt.Errorf("bulk request error: %w", err) } defer res.Body.Close() if res.IsError() { - result.Error = fmt.Sprintf("elasticsearch error: %s", res.String()) - result.EndTime = time.Now() - result.Duration = time.Since(startTime).String() - return result + return fmt.Errorf("bulk request failed: %s", res.String()) } - var searchResult map[string]any - if err := json.NewDecoder(res.Body).Decode(&searchResult); err != nil { - result.Error = fmt.Sprintf("error decoding search result: %v", err) - result.EndTime = time.Now() - result.Duration = time.Since(startTime).String() - return result - } - - scrollID, ok := searchResult["_scroll_id"].(string) - if !ok { - result.Error = "no scroll_id found in search result" - result.EndTime = time.Now() - result.Duration = time.Since(startTime).String() - return result - } - - hits := searchResult["hits"].(map[string]any)["hits"].([]any) - firstDocument := true - documentCount := 0 - - for _, hit := range hits { - if !firstDocument { - if _, err := writer.Write([]byte(",\n")); err != nil { - result.Error = fmt.Sprintf("error writing separator: %v", err) - result.EndTime = time.Now() - result.Duration = time.Since(startTime).String() - return result - } - } - firstDocument = false - - source := hit.(map[string]any)["_source"] - if err := e.writeDocument(writer, source); err != nil { - result.Error = fmt.Sprintf("error writing document: %v", err) - result.EndTime = time.Now() - result.Duration = time.Since(startTime).String() - return result - } - documentCount++ - } - - for { - select { - case <-ctx.Done(): - result.Error = "context cancelled" - result.EndTime = time.Now() - result.Duration = time.Since(startTime).String() - return result - default: - } - - scrollRes, err := e.client.Scroll( - e.client.Scroll.WithScrollID(scrollID), - e.client.Scroll.WithScroll(1000), - e.client.Scroll.WithContext(ctx), - ) - if err != nil { - result.Error = fmt.Sprintf("error in scroll request: %v", err) - break - } - defer scrollRes.Body.Close() - - if scrollRes.IsError() { - result.Error = fmt.Sprintf("elasticsearch scroll error: %s", scrollRes.String()) - break - } - - var scrollResult map[string]any - if err := json.NewDecoder(scrollRes.Body).Decode(&scrollResult); err != nil { - result.Error = fmt.Sprintf("error decoding scroll result: %v", err) - break - } - - hits := scrollResult["hits"].(map[string]any)["hits"].([]any) - if len(hits) == 0 { - break - } - - scrollID, _ = scrollResult["_scroll_id"].(string) - - for _, hit := range hits { - if _, err := writer.Write([]byte(",\n")); err != nil { - result.Error = fmt.Sprintf("error writing separator: %v", err) - result.EndTime = time.Now() - result.Duration = time.Since(startTime).String() - return result - } - - source := hit.(map[string]any)["_source"] - if err := e.writeDocument(writer, source); err != nil { - result.Error = fmt.Sprintf("error writing document: %v", err) - result.EndTime = time.Now() - result.Duration = time.Since(startTime).String() - return result - } - documentCount++ - } - } - - if _, err := writer.Write([]byte("\n ]")); err != nil { - if result.Error == "" { - result.Error = fmt.Sprintf("error writing index footer: %v", err) - } - } - - result.DocumentCount = documentCount - result.EndTime = time.Now() - result.Duration = time.Since(startTime).String() - - slog.Info("Index export completed", - "index", index, - "documents", documentCount, - "duration", result.Duration, - ) - - return result + slog.Debug("Batch successfully exported to Elasticsearch", "count", len(entries)) + return nil } -func (e *ElasticsearchExporter) writeDocument(writer io.Writer, document any) error { - jsonBytes, err := json.MarshalIndent(document, " ", " ") +func (e *ElasticsearchExporter) HealthCheck(ctx context.Context) error { + timeout := time.Duration(e.config.Timeout) * time.Second + ctx, cancel := context.WithTimeout(ctx, timeout) + defer cancel() + + res, err := e.client.Info(e.client.Info.WithContext(ctx)) if err != nil { - return fmt.Errorf("error marshalling document: %w", err) + return fmt.Errorf("health check failed: %w", err) } + defer res.Body.Close() - if _, err := writer.Write([]byte(" ")); err != nil { - return err - } - - if _, err := writer.Write(jsonBytes); err != nil { - return err + if res.IsError() { + return fmt.Errorf("health check failed: %s", res.String()) } return nil diff --git a/elasticsearch_exporterV2.go b/elasticsearch_exporterV2.go deleted file mode 100644 index b0205e5..0000000 --- a/elasticsearch_exporterV2.go +++ /dev/null @@ -1,91 +0,0 @@ -package main - -import ( - "context" - "encoding/json" - "fmt" - "log/slog" - "strings" - "time" - "tixel_watch/models" - - "github.com/elastic/go-elasticsearch/v8" -) - -type ElasticsearchExporterV2 struct { - client *elasticsearch.Client - config ElasticsearchConfig -} - -func NewElasticsearchExporterV2(config ElasticsearchConfig) (*ElasticsearchExporterV2, error) { - client, err := NewElasticsearchClient(config) - if err != nil { - return nil, fmt.Errorf("failed to create Elasticsearch client: %w", err) - } - - return &ElasticsearchExporterV2{ - client: client, - config: config, - }, nil -} - -func (e *ElasticsearchExporterV2) Export(ctx context.Context, entries []models.LogMessage) error { - if len(entries) == 0 { - return nil - } - - var body strings.Builder - for _, entry := range entries { - indexName := e.config.Index - - indexLine := fmt.Sprintf(`{"index":{"_index":"%s"}}`, indexName) - body.WriteString(indexLine) - body.WriteString("\n") - - data, err := json.Marshal(entry) - if err != nil { - slog.Error("error marshalling JSON", "error", err) - continue - } - body.WriteString(string(data)) - body.WriteString("\n") - } - - timeout := time.Duration(e.config.Timeout) * time.Second - ctx, cancel := context.WithTimeout(ctx, timeout) - defer cancel() - - res, err := e.client.Bulk( - strings.NewReader(body.String()), - e.client.Bulk.WithContext(ctx), - ) - if err != nil { - return fmt.Errorf("bulk request error: %w", err) - } - defer res.Body.Close() - - if res.IsError() { - return fmt.Errorf("bulk request failed: %s", res.String()) - } - - slog.Debug("Batch successfully exported to Elasticsearch", "count", len(entries)) - return nil -} - -func (e *ElasticsearchExporterV2) HealthCheck(ctx context.Context) error { - timeout := time.Duration(e.config.Timeout) * time.Second - ctx, cancel := context.WithTimeout(ctx, timeout) - defer cancel() - - res, err := e.client.Info(e.client.Info.WithContext(ctx)) - if err != nil { - return fmt.Errorf("health check failed: %w", err) - } - defer res.Body.Close() - - if res.IsError() { - return fmt.Errorf("health check failed: %s", res.String()) - } - - return nil -} diff --git a/export_manager.go b/export_manager.go index bf0ee72..a7fe501 100644 --- a/export_manager.go +++ b/export_manager.go @@ -8,7 +8,7 @@ import ( "strings" "sync" "time" - "tixel_watch/models" + "watch-tool/models" ) type ExportManager struct { diff --git a/exporter_interface.go b/exporter_interface.go index 4b9d455..5753f9e 100644 --- a/exporter_interface.go +++ b/exporter_interface.go @@ -2,7 +2,7 @@ package main import ( "context" - "tixel_watch/models" + "watch-tool/models" ) type ExporterInterface interface { diff --git a/file_monitor.go b/file_monitor.go index ab2c1de..8f016b8 100644 --- a/file_monitor.go +++ b/file_monitor.go @@ -6,18 +6,19 @@ import ( "log/slog" "regexp" "strings" - "tixel_watch/models" - "tixel_watch/parser" + "watch-tool/models" + "watch-tool/parser" "github.com/hpcloud/tail" ) type FileMonitor struct { - config ToolConfig - parser parser.Parser + config ToolConfig + parser parser.Parser + hostname string } -func NewFileMonitor(config ToolConfig) *FileMonitor { +func NewFileMonitor(config ToolConfig, hostname string) *FileMonitor { var logParser parser.Parser if config.Format.Pattern != "" { @@ -34,15 +35,16 @@ func NewFileMonitor(config ToolConfig) *FileMonitor { } } else { var err error - logParser, err = parser.New(config.Name, "custom") + logParser, err = parser.New(config.Name, "custom", hostname) if err != nil { slog.Error("cannot get tool specific parser", "error", err) } } return &FileMonitor{ - config: config, - parser: logParser, + config: config, + parser: logParser, + hostname: hostname, } } diff --git a/go.mod b/go.mod index fdc6247..15dd72a 100644 --- a/go.mod +++ b/go.mod @@ -1,4 +1,4 @@ -module tixel_watch +module watch-tool go 1.24.1 @@ -8,6 +8,7 @@ require ( github.com/shirou/gopsutil v3.21.11+incompatible github.com/spf13/viper v1.20.1 golang.org/x/sys v0.34.0 + gopkg.in/yaml.v3 v3.0.1 modernc.org/sqlite v1.39.0 ) @@ -42,7 +43,6 @@ require ( golang.org/x/text v0.21.0 // indirect gopkg.in/fsnotify.v1 v1.4.7 // indirect gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7 // indirect - gopkg.in/yaml.v3 v3.0.1 // indirect modernc.org/libc v1.66.3 // indirect modernc.org/mathutil v1.7.1 // indirect modernc.org/memory v1.11.0 // indirect diff --git a/helpers/helpers.go b/helpers/helpers.go index e16183c..2a35776 100644 --- a/helpers/helpers.go +++ b/helpers/helpers.go @@ -3,11 +3,10 @@ package helpers import ( "fmt" "log/slog" - "os" "regexp" "strings" "time" - "tixel_watch/models" + "watch-tool/models" ) var ( @@ -76,11 +75,3 @@ func ParseSyslogTimeToRFC3339(syslogTime string) (time.Time, error) { t = t.AddDate(now.Year(), 0, 0) return t, nil } - -func GetHostname() (string, error) { - hostname, err := os.Hostname() - if err != nil { - hostname = "unknown" - } - return hostname, nil -} diff --git a/helpers/utils.go b/helpers/utils.go new file mode 100644 index 0000000..c8933bd --- /dev/null +++ b/helpers/utils.go @@ -0,0 +1,47 @@ +package helpers + +import ( + "context" + "fmt" + "log/slog" + "runtime/debug" +) + +type AppError struct { + Op string + Err error + Context string +} + +func (e *AppError) Error() string { + if e.Context != "" { + return fmt.Sprintf("%s: %v (%s)", e.Op, e.Err, e.Context) + } + return fmt.Sprintf("%s: %v", e.Op, e.Err) +} + +func (e *AppError) Unwrap() error { + return e.Err +} + +func NewAppError(op string, err error, ctx string) error { + return &AppError{Op: op, Err: err, Context: ctx} +} + +func SafeGo(ctx context.Context, name string, fn func()) { + go func() { + defer func() { + if r := recover(); r != nil { + stack := string(debug.Stack()) + slog.Error("CRITICAL: Panic recovered in goroutine", + "goroutine", name, + "panic", r, + "stack", stack, + ) + // Optional: Hier könnte man Metriken inkrementieren (siehe Observability) + } + }() + + fn() + }() +} diff --git a/local_storage.go b/local_storage.go index f4b6265..bafdb80 100644 --- a/local_storage.go +++ b/local_storage.go @@ -4,28 +4,277 @@ import ( "context" "database/sql" "encoding/json" - "tixel_watch/models" + "fmt" + "log/slog" + "os" + "path/filepath" + "sort" + "strings" + "sync" + "time" + "watch-tool/models" _ "modernc.org/sqlite" ) -type StorageService struct { - db *sql.DB +type SQLiteStorage struct { + db *sql.DB + dbPath string + rotationCfg StorageRotationConfig + rotationStop chan struct{} + rotationWg sync.WaitGroup + mu sync.RWMutex } -func NewStorageService(dbPath string) (*StorageService, error) { - db, err := sql.Open("sqlite", dbPath) +func DefaultRotationConfig() StorageRotationConfig { + return StorageRotationConfig{ + MaxSizeBytes: 100 * 1024 * 1024, // 100MB + MaxAgeHours: 48 * time.Hour, // 48 hours + MaxFiles: 3, // 3 old Files + CheckIntervalMinutes: 5 * time.Minute, // check every 5 minutes + ArchiveDir: "", // same directory + } +} + +func NewSQLiteStorage(dbPath string) (*SQLiteStorage, error) { + return NewSQLiteStorageWithRotation(dbPath, StorageRotationConfig{}) +} + +func NewSQLiteStorageWithRotation(dbPath string, rotationCfg StorageRotationConfig) (*SQLiteStorage, error) { + if rotationCfg.CheckIntervalMinutes == 0 { + rotationCfg = DefaultRotationConfig() + } + dsn := fmt.Sprintf("%s?_busy_timeout=5000&_journal_mode=WAL", dbPath) + + db, err := sql.Open("sqlite", dsn) + + if err != nil { + return nil, fmt.Errorf("failed to open SQLite database: %w", err) + } + + if _, err := db.Exec("PRAGMA journal_mode=WAL"); err != nil { + return nil, fmt.Errorf("failed to enable WAL mode: %w", err) + } + + if err := createTables(db); err != nil { + return nil, fmt.Errorf("failed to create tables: %w", err) + } + + storage := &SQLiteStorage{ + db: db, + dbPath: dbPath, + rotationCfg: rotationCfg, + rotationStop: make(chan struct{}), + } + + if rotationCfg.MaxSizeBytes > 0 || rotationCfg.MaxAgeHours > 0 { + storage.rotationWg.Add(1) + go storage.rotationWorker() + slog.Info("Log rotation enabled", + "maxSize", rotationCfg.MaxSizeBytes, + "maxAge", rotationCfg.MaxAgeHours, + "maxFiles", rotationCfg.MaxFiles) + } + + return storage, nil +} + +func (s *SQLiteStorage) rotationWorker() { + defer s.rotationWg.Done() + ticker := time.NewTicker(s.rotationCfg.CheckIntervalMinutes) + defer ticker.Stop() + + for { + select { + case <-s.rotationStop: + return + case <-ticker.C: + if err := s.checkAndRotate(); err != nil { + slog.Error("Error during log rotation check", "error", err) + } + } + } +} + +func (s *SQLiteStorage) checkAndRotate() error { + s.mu.Lock() + defer s.mu.Unlock() + + needsRotation, reason, err := s.needsRotation() + if err != nil { + return fmt.Errorf("error checking rotation needs: %w", err) + } + + if needsRotation { + slog.Info("Starting log rotation", "reason", reason) + if err := s.rotateDatabase(); err != nil { + return fmt.Errorf("error rotating database: %w", err) + } + slog.Info("Log rotation completed successfully") + } + + return nil +} + +func (s *SQLiteStorage) needsRotation() (bool, string, error) { + if s.rotationCfg.MaxSizeBytes > 0 { + fileInfo, err := os.Stat(s.dbPath) + if err != nil { + return false, "", err + } + if fileInfo.Size() >= s.rotationCfg.MaxSizeBytes { + return true, fmt.Sprintf("file size %d >= max size %d", fileInfo.Size(), s.rotationCfg.MaxSizeBytes), nil + } + } + + if s.rotationCfg.MaxAgeHours > 0 { + fileInfo, err := os.Stat(s.dbPath) + if err != nil { + return false, "", err + } + age := time.Since(fileInfo.ModTime()) + if age >= s.rotationCfg.MaxAgeHours { + return true, fmt.Sprintf("file age %v >= max age %v", age, s.rotationCfg.MaxAgeHours), nil + } + } + + return false, "", nil +} + +func (s *SQLiteStorage) rotateDatabase() error { + if err := s.db.Close(); err != nil { + return fmt.Errorf("error closing database: %w", err) + } + + archivePath := s.generateArchivePath() + + if err := os.Rename(s.dbPath, archivePath); err != nil { + return fmt.Errorf("error moving database to archive: %w", err) + } + + db, err := sql.Open("sqlite", s.dbPath) + if err != nil { + return fmt.Errorf("error opening new database: %w", err) + } + + if _, err := db.Exec("PRAGMA journal_mode=WAL"); err != nil { + return fmt.Errorf("failed to enable WAL mode on new database: %w", err) + } + + if err := createTables(db); err != nil { + return fmt.Errorf("failed to create tables in new database: %w", err) + } + + s.db = db + + if err := s.cleanupOldArchives(); err != nil { + slog.Warn("Error cleaning up old archives", "error", err) + } + + return nil +} + +func (s *SQLiteStorage) generateArchivePath() string { + dir := filepath.Dir(s.dbPath) + if s.rotationCfg.ArchiveDir != "" { + dir = s.rotationCfg.ArchiveDir + os.MkdirAll(dir, 0755) + } + + base := filepath.Base(s.dbPath) + ext := filepath.Ext(base) + name := strings.TrimSuffix(base, ext) + + timestamp := time.Now().Format("2006-01-02_15-04-05") + archiveName := fmt.Sprintf("%s.%s%s", name, timestamp, ext) + + return filepath.Join(dir, archiveName) +} + +func (s *SQLiteStorage) cleanupOldArchives() error { + if s.rotationCfg.MaxFiles <= 0 { + return nil + } + + dir := filepath.Dir(s.dbPath) + if s.rotationCfg.ArchiveDir != "" { + dir = s.rotationCfg.ArchiveDir + } + + base := filepath.Base(s.dbPath) + ext := filepath.Ext(base) + name := strings.TrimSuffix(base, ext) + pattern := fmt.Sprintf("%s.*%s", name, ext) + + files, err := filepath.Glob(filepath.Join(dir, pattern)) + if err != nil { + return err + } + + var archives []string + for _, file := range files { + if file != s.dbPath { + archives = append(archives, file) + } + } + + sort.Slice(archives, func(i, j int) bool { + infoI, _ := os.Stat(archives[i]) + infoJ, _ := os.Stat(archives[j]) + return infoI.ModTime().After(infoJ.ModTime()) + }) + + if len(archives) > s.rotationCfg.MaxFiles { + for _, file := range archives[s.rotationCfg.MaxFiles:] { + if err := os.Remove(file); err != nil { + slog.Warn("Error removing old archive", "file", file, "error", err) + } else { + slog.Info("Removed old archive", "file", file) + } + } + } + + return nil +} + +func (s *SQLiteStorage) ForceRotate() error { + s.mu.Lock() + defer s.mu.Unlock() + + slog.Info("Forcing log rotation") + return s.rotateDatabase() +} + +func (s *SQLiteStorage) GetRotationInfo() (map[string]any, error) { + s.mu.RLock() + defer s.mu.RUnlock() + + fileInfo, err := os.Stat(s.dbPath) if err != nil { return nil, err } + info := map[string]any{ + "currentSize": fileInfo.Size(), + "maxSize": s.rotationCfg.MaxSizeBytes, + "currentAge": time.Since(fileInfo.ModTime()).String(), + "maxAge": s.rotationCfg.MaxAgeHours.String(), + "maxFiles": s.rotationCfg.MaxFiles, + "checkInterval": s.rotationCfg.CheckIntervalMinutes.String(), + "archiveDir": s.rotationCfg.ArchiveDir, + } + + return info, nil +} + +func createTables(db *sql.DB) error { createTableStmt := ` CREATE TABLE IF NOT EXISTS log_entries ( id INTEGER PRIMARY KEY AUTOINCREMENT, service TEXT, - timestamp DATETIME, - type TEXT, - host TEXT, + timestamp DATETIME NOT NULL, + type TEXT NOT NULL, + host TEXT NOT NULL, tool TEXT, log_level TEXT, log_message TEXT, @@ -39,143 +288,317 @@ func NewStorageService(dbPath string) (*StorageService, error) { fields TEXT, service_information TEXT, system_metrics TEXT, - tool_information TEXT - ); - ` - _, err = db.ExecContext(context.Background(), createTableStmt) - if err != nil { - return nil, err + tool_information TEXT, + created_at DATETIME DEFAULT CURRENT_TIMESTAMP, + exported_at DATETIME + );` + + if _, err := db.Exec(createTableStmt); err != nil { + return err } - return &StorageService{db: db}, nil + indexes := []string{ + "CREATE INDEX IF NOT EXISTS idx_timestamp ON log_entries(timestamp);", + "CREATE INDEX IF NOT EXISTS idx_service ON log_entries(service);", + "CREATE INDEX IF NOT EXISTS idx_type ON log_entries(type);", + "CREATE INDEX IF NOT EXISTS idx_tool ON log_entries(tool);", + "CREATE INDEX IF NOT EXISTS idx_log_level ON log_entries(log_level);", + "CREATE INDEX IF NOT EXISTS idx_exported ON log_entries(exported_at);", + "CREATE INDEX IF NOT EXISTS idx_composite ON log_entries(timestamp, type, service);", + } + + for _, index := range indexes { + if _, err := db.Exec(index); err != nil { + return fmt.Errorf("failed to create index: %w", err) + } + } + + return nil } -func (s *StorageService) Close() error { +func (s *SQLiteStorage) Store(ctx context.Context, entry *models.LogMessage) error { + return s.StoreBatch(ctx, []models.LogMessage{*entry}) +} + +func (s *SQLiteStorage) StoreBatch(ctx context.Context, entries []models.LogMessage) error { + if len(entries) == 0 { + return nil + } + + s.mu.RLock() + defer s.mu.RUnlock() + + tx, err := s.db.BeginTx(ctx, nil) + if err != nil { + return fmt.Errorf("failed to begin transaction: %w", err) + } + defer tx.Rollback() + + stmt, err := tx.PrepareContext(ctx, ` + INSERT INTO log_entries + (service, timestamp, type, host, tool, log_level, log_message, raw, priority, priority_name, + unit, pid, boot_id, machine_id, fields, service_information, system_metrics, tool_information) + VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)`) + if err != nil { + return fmt.Errorf("failed to prepare statement: %w", err) + } + defer stmt.Close() + + for _, entry := range entries { + fieldsJSON, _ := json.Marshal(entry.Fields) + serviceInfoJSON, _ := json.Marshal(entry.ServiceInformation) + systemMetricsJSON, _ := json.Marshal(entry.SystemMetrics) + toolInfoJSON, _ := json.Marshal(entry.ToolInformation) + + _, err := stmt.ExecContext(ctx, + entry.Service, + entry.Timestamp, + entry.Type, + entry.Host, + entry.Tool, + entry.LogLevel, + entry.LogMessage, + entry.Raw, + entry.Priority, + entry.PriorityName, + entry.Unit, + entry.PID, + entry.BootID, + entry.MachineID, + string(fieldsJSON), + string(serviceInfoJSON), + string(systemMetricsJSON), + string(toolInfoJSON), + ) + if err != nil { + return fmt.Errorf("failed to insert entry: %w", err) + } + } + + return tx.Commit() +} + +func (s *SQLiteStorage) Query(ctx context.Context, query StorageQuery) ([]models.LogMessage, error) { + s.mu.RLock() + defer s.mu.RUnlock() + + sqlQuery := "SELECT service, timestamp, type, host, tool, log_level, log_message, raw, priority, priority_name, unit, pid, boot_id, machine_id, fields, service_information, system_metrics, tool_information FROM log_entries WHERE 1=1" + args := []any{} + argCount := 0 + + if !query.StartTime.IsZero() { + argCount++ + sqlQuery += fmt.Sprintf(" AND timestamp >= ?%d", argCount) + args = append(args, query.StartTime) + } + if !query.EndTime.IsZero() { + argCount++ + sqlQuery += fmt.Sprintf(" AND timestamp <= ?%d", argCount) + args = append(args, query.EndTime) + } + if query.Service != "" { + argCount++ + sqlQuery += fmt.Sprintf(" AND service = ?%d", argCount) + args = append(args, query.Service) + } + if query.Tool != "" { + argCount++ + sqlQuery += fmt.Sprintf(" AND tool = ?%d", argCount) + args = append(args, query.Tool) + } + if query.LogLevel != "" { + argCount++ + sqlQuery += fmt.Sprintf(" AND log_level = ?%d", argCount) + args = append(args, query.LogLevel) + } + if query.Type != "" { + argCount++ + sqlQuery += fmt.Sprintf(" AND type = ?%d", argCount) + args = append(args, query.Type) + } + + if query.OrderBy != "" { + direction := "ASC" + if query.OrderDesc { + direction = "DESC" + } + sqlQuery += fmt.Sprintf(" ORDER BY %s %s", query.OrderBy, direction) + } else { + sqlQuery += " ORDER BY timestamp DESC" + } + + if query.Limit > 0 { + sqlQuery += fmt.Sprintf(" LIMIT %d", query.Limit) + if query.Offset > 0 { + sqlQuery += fmt.Sprintf(" OFFSET %d", query.Offset) + } + } + + rows, err := s.db.QueryContext(ctx, sqlQuery, args...) + if err != nil { + return nil, fmt.Errorf("failed to execute query: %w", err) + } + defer rows.Close() + + var entries []models.LogMessage + for rows.Next() { + var entry models.LogMessage + var fieldsJSON, serviceInfoJSON, systemMetricsJSON, toolInfoJSON string + + err := rows.Scan( + &entry.Service, + &entry.Timestamp, + &entry.Type, + &entry.Host, + &entry.Tool, + &entry.LogLevel, + &entry.LogMessage, + &entry.Raw, + &entry.Priority, + &entry.PriorityName, + &entry.Unit, + &entry.PID, + &entry.BootID, + &entry.MachineID, + &fieldsJSON, + &serviceInfoJSON, + &systemMetricsJSON, + &toolInfoJSON, + ) + if err != nil { + return nil, fmt.Errorf("failed to scan row: %w", err) + } + + if fieldsJSON != "" && fieldsJSON != "null" { + json.Unmarshal([]byte(fieldsJSON), &entry.Fields) + } + if serviceInfoJSON != "" && serviceInfoJSON != "null" { + json.Unmarshal([]byte(serviceInfoJSON), &entry.ServiceInformation) + } + if systemMetricsJSON != "" && systemMetricsJSON != "null" { + json.Unmarshal([]byte(systemMetricsJSON), &entry.SystemMetrics) + } + if toolInfoJSON != "" && toolInfoJSON != "null" { + json.Unmarshal([]byte(toolInfoJSON), &entry.ToolInformation) + } + + entries = append(entries, entry) + } + + return entries, rows.Err() +} + +func (s *SQLiteStorage) MarkAsExported(ctx context.Context, ids []int64) error { + if len(ids) == 0 { + return nil + } + + s.mu.RLock() + defer s.mu.RUnlock() + + tx, err := s.db.BeginTx(ctx, nil) + if err != nil { + return err + } + + placeholders := strings.Repeat("?,", len(ids)) + placeholders = placeholders[:len(placeholders)-1] + + sqlQuery := fmt.Sprintf("UPDATE log_entries SET exported_at = CURRENT_TIMESTAMP WHERE id IN (%s)", placeholders) + + args := make([]any, len(ids)) + for i, id := range ids { + args[i] = id + } + + _, err = tx.ExecContext(ctx, sqlQuery, args...) + if err != nil { + tx.Rollback() + return err + } + + return tx.Commit() +} + +func (s *SQLiteStorage) GetUnexportedEntries(ctx context.Context, limit int) ([]models.LogMessage, error) { + s.mu.RLock() + defer s.mu.RUnlock() + + sqlQuery := `SELECT id, service, timestamp, type, host, tool, log_level, log_message, raw, priority, priority_name, + unit, pid, boot_id, machine_id, fields, service_information, system_metrics, tool_information + FROM log_entries WHERE exported_at IS NULL ORDER BY timestamp ASC` + + if limit > 0 { + sqlQuery += fmt.Sprintf(" LIMIT %d", limit) + } + + rows, err := s.db.QueryContext(ctx, sqlQuery) + if err != nil { + return nil, fmt.Errorf("failed to execute query: %w", err) + } + defer rows.Close() + + var entries []models.LogMessage + for rows.Next() { + var entry models.LogMessage + var id int64 + var fieldsJSON, serviceInfoJSON, systemMetricsJSON, toolInfoJSON string + + err := rows.Scan( + &id, + &entry.Service, + &entry.Timestamp, + &entry.Type, + &entry.Host, + &entry.Tool, + &entry.LogLevel, + &entry.LogMessage, + &entry.Raw, + &entry.Priority, + &entry.PriorityName, + &entry.Unit, + &entry.PID, + &entry.BootID, + &entry.MachineID, + &fieldsJSON, + &serviceInfoJSON, + &systemMetricsJSON, + &toolInfoJSON, + ) + if err != nil { + return nil, fmt.Errorf("failed to scan row: %w", err) + } + + if entry.Fields == nil { + entry.Fields = make(map[string]any) + } + entry.Fields["_internal_id"] = id + + if fieldsJSON != "" && fieldsJSON != "null" { + json.Unmarshal([]byte(fieldsJSON), &entry.Fields) + } + if serviceInfoJSON != "" && serviceInfoJSON != "null" { + json.Unmarshal([]byte(serviceInfoJSON), &entry.ServiceInformation) + } + if systemMetricsJSON != "" && systemMetricsJSON != "null" { + json.Unmarshal([]byte(systemMetricsJSON), &entry.SystemMetrics) + } + if toolInfoJSON != "" && toolInfoJSON != "null" { + json.Unmarshal([]byte(toolInfoJSON), &entry.ToolInformation) + } + + entries = append(entries, entry) + } + + return entries, rows.Err() +} + +func (s *SQLiteStorage) Close() error { + close(s.rotationStop) + s.rotationWg.Wait() + + s.mu.Lock() + defer s.mu.Unlock() + return s.db.Close() } - -func (s *StorageService) SaveLogEntry(ctx context.Context, entry *models.LogMessage) error { - fieldsJSON := "" - if entry.Fields != nil { - b, err := json.Marshal(entry.Fields) - if err != nil { - return err - } - fieldsJSON = string(b) - } - - serviceInfoJSON := "" - if entry.ServiceInformation != nil { - b, err := json.Marshal(entry.ServiceInformation) - if err != nil { - return err - } - serviceInfoJSON = string(b) - } - - systemMetricsJSON := "" - if entry.SystemMetrics != nil { - b, err := json.Marshal(entry.SystemMetrics) - if err != nil { - return err - } - systemMetricsJSON = string(b) - } - - toolInfoJSON := "" - if entry.ToolInformation != nil { - b, err := json.Marshal(entry.ToolInformation) - if err != nil { - return err - } - toolInfoJSON = string(b) - } - - stmt := ` - INSERT INTO log_entries - (service, timestamp, type, host, tool, log_level, log_message, raw, priority, priority_name, unit, pid, boot_id, machine_id, fields, service_information, system_metrics, tool_information) - VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) - ` - _, err := s.db.ExecContext(ctx, stmt, - entry.Service, - entry.Timestamp, - entry.Type, - entry.Host, - entry.Tool, - entry.LogLevel, - entry.LogMessage, - entry.Raw, - entry.Priority, - entry.PriorityName, - entry.Unit, - entry.PID, - entry.BootID, - entry.MachineID, - fieldsJSON, - serviceInfoJSON, - systemMetricsJSON, - toolInfoJSON, - ) - return err -} - -func (s *StorageService) LoadLogEntry(ctx context.Context, id int64) (*models.LogMessage, error) { - row := s.db.QueryRowContext(ctx, "SELECT service, timestamp, type, host, tool, log_level, log_message, raw, priority, priority_name, unit, pid, boot_id, machine_id, fields, service_information, system_metrics, tool_information FROM log_entries WHERE id = ?", id) - - var entry models.LogMessage - var fieldsJSON, serviceInfoJSON, systemMetricsJSON, toolInfoJSON string - - err := row.Scan( - &entry.Service, - &entry.Timestamp, - &entry.Type, - &entry.Host, - &entry.Tool, - &entry.LogLevel, - &entry.LogMessage, - &entry.Raw, - &entry.Priority, - &entry.PriorityName, - &entry.Unit, - &entry.PID, - &entry.BootID, - &entry.MachineID, - &fieldsJSON, - &serviceInfoJSON, - &systemMetricsJSON, - &toolInfoJSON, - ) - if err != nil { - return nil, err - } - - if fieldsJSON != "" { - var fields map[string]any - if err = json.Unmarshal([]byte(fieldsJSON), &fields); err == nil { - entry.Fields = fields - } - } - - if serviceInfoJSON != "" { - var si any - if err = json.Unmarshal([]byte(serviceInfoJSON), &si); err == nil { - entry.ServiceInformation = si - } - } - - if systemMetricsJSON != "" { - var sm any - if err = json.Unmarshal([]byte(systemMetricsJSON), &sm); err == nil { - entry.SystemMetrics = sm - } - } - - if toolInfoJSON != "" { - var ti any - if err = json.Unmarshal([]byte(toolInfoJSON), &ti); err == nil { - entry.ToolInformation = ti - } - } - - return &entry, nil -} diff --git a/local_storageV2.go b/local_storageV2.go deleted file mode 100644 index f6c7dea..0000000 --- a/local_storageV2.go +++ /dev/null @@ -1,602 +0,0 @@ -package main - -import ( - "context" - "database/sql" - "encoding/json" - "fmt" - "log/slog" - "os" - "path/filepath" - "sort" - "strings" - "sync" - "time" - "tixel_watch/models" - - _ "modernc.org/sqlite" -) - -type SQLiteStorage struct { - db *sql.DB - dbPath string - rotationCfg StorageRotationConfig - rotationStop chan struct{} - rotationWg sync.WaitGroup - mu sync.RWMutex -} - -func DefaultRotationConfig() StorageRotationConfig { - return StorageRotationConfig{ - MaxSizeBytes: 100 * 1024 * 1024, // 100MB - MaxAgeHours: 48 * time.Hour, // 48 hours - MaxFiles: 3, // 3 old Files - CheckIntervalMinutes: 5 * time.Minute, // check every 5 minutes - ArchiveDir: "", // same directory - } -} - -func NewSQLiteStorage(dbPath string) (*SQLiteStorage, error) { - return NewSQLiteStorageWithRotation(dbPath, StorageRotationConfig{}) -} - -func NewSQLiteStorageWithRotation(dbPath string, rotationCfg StorageRotationConfig) (*SQLiteStorage, error) { - if rotationCfg.CheckIntervalMinutes == 0 { - rotationCfg = DefaultRotationConfig() - } - - db, err := sql.Open("sqlite", dbPath) - if err != nil { - return nil, fmt.Errorf("failed to open SQLite database: %w", err) - } - - if _, err := db.Exec("PRAGMA journal_mode=WAL"); err != nil { - return nil, fmt.Errorf("failed to enable WAL mode: %w", err) - } - - if err := createTables(db); err != nil { - return nil, fmt.Errorf("failed to create tables: %w", err) - } - - storage := &SQLiteStorage{ - db: db, - dbPath: dbPath, - rotationCfg: rotationCfg, - rotationStop: make(chan struct{}), - } - - if rotationCfg.MaxSizeBytes > 0 || rotationCfg.MaxAgeHours > 0 { - storage.rotationWg.Add(1) - go storage.rotationWorker() - slog.Info("Log rotation enabled", - "maxSize", rotationCfg.MaxSizeBytes, - "maxAge", rotationCfg.MaxAgeHours, - "maxFiles", rotationCfg.MaxFiles) - } - - return storage, nil -} - -func (s *SQLiteStorage) rotationWorker() { - defer s.rotationWg.Done() - ticker := time.NewTicker(s.rotationCfg.CheckIntervalMinutes) - defer ticker.Stop() - - for { - select { - case <-s.rotationStop: - return - case <-ticker.C: - if err := s.checkAndRotate(); err != nil { - slog.Error("Error during log rotation check", "error", err) - } - } - } -} - -func (s *SQLiteStorage) checkAndRotate() error { - s.mu.Lock() - defer s.mu.Unlock() - - needsRotation, reason, err := s.needsRotation() - if err != nil { - return fmt.Errorf("error checking rotation needs: %w", err) - } - - if needsRotation { - slog.Info("Starting log rotation", "reason", reason) - if err := s.rotateDatabase(); err != nil { - return fmt.Errorf("error rotating database: %w", err) - } - slog.Info("Log rotation completed successfully") - } - - return nil -} - -func (s *SQLiteStorage) needsRotation() (bool, string, error) { - if s.rotationCfg.MaxSizeBytes > 0 { - fileInfo, err := os.Stat(s.dbPath) - if err != nil { - return false, "", err - } - if fileInfo.Size() >= s.rotationCfg.MaxSizeBytes { - return true, fmt.Sprintf("file size %d >= max size %d", fileInfo.Size(), s.rotationCfg.MaxSizeBytes), nil - } - } - - if s.rotationCfg.MaxAgeHours > 0 { - fileInfo, err := os.Stat(s.dbPath) - if err != nil { - return false, "", err - } - age := time.Since(fileInfo.ModTime()) - if age >= s.rotationCfg.MaxAgeHours { - return true, fmt.Sprintf("file age %v >= max age %v", age, s.rotationCfg.MaxAgeHours), nil - } - } - - return false, "", nil -} - -func (s *SQLiteStorage) rotateDatabase() error { - if err := s.db.Close(); err != nil { - return fmt.Errorf("error closing database: %w", err) - } - - archivePath := s.generateArchivePath() - - if err := os.Rename(s.dbPath, archivePath); err != nil { - return fmt.Errorf("error moving database to archive: %w", err) - } - - db, err := sql.Open("sqlite", s.dbPath) - if err != nil { - return fmt.Errorf("error opening new database: %w", err) - } - - if _, err := db.Exec("PRAGMA journal_mode=WAL"); err != nil { - return fmt.Errorf("failed to enable WAL mode on new database: %w", err) - } - - if err := createTables(db); err != nil { - return fmt.Errorf("failed to create tables in new database: %w", err) - } - - s.db = db - - if err := s.cleanupOldArchives(); err != nil { - slog.Warn("Error cleaning up old archives", "error", err) - } - - return nil -} - -func (s *SQLiteStorage) generateArchivePath() string { - dir := filepath.Dir(s.dbPath) - if s.rotationCfg.ArchiveDir != "" { - dir = s.rotationCfg.ArchiveDir - os.MkdirAll(dir, 0755) - } - - base := filepath.Base(s.dbPath) - ext := filepath.Ext(base) - name := strings.TrimSuffix(base, ext) - - timestamp := time.Now().Format("2006-01-02_15-04-05") - archiveName := fmt.Sprintf("%s.%s%s", name, timestamp, ext) - - return filepath.Join(dir, archiveName) -} - -func (s *SQLiteStorage) cleanupOldArchives() error { - if s.rotationCfg.MaxFiles <= 0 { - return nil - } - - dir := filepath.Dir(s.dbPath) - if s.rotationCfg.ArchiveDir != "" { - dir = s.rotationCfg.ArchiveDir - } - - base := filepath.Base(s.dbPath) - ext := filepath.Ext(base) - name := strings.TrimSuffix(base, ext) - pattern := fmt.Sprintf("%s.*%s", name, ext) - - files, err := filepath.Glob(filepath.Join(dir, pattern)) - if err != nil { - return err - } - - var archives []string - for _, file := range files { - if file != s.dbPath { - archives = append(archives, file) - } - } - - sort.Slice(archives, func(i, j int) bool { - infoI, _ := os.Stat(archives[i]) - infoJ, _ := os.Stat(archives[j]) - return infoI.ModTime().After(infoJ.ModTime()) - }) - - if len(archives) > s.rotationCfg.MaxFiles { - for _, file := range archives[s.rotationCfg.MaxFiles:] { - if err := os.Remove(file); err != nil { - slog.Warn("Error removing old archive", "file", file, "error", err) - } else { - slog.Info("Removed old archive", "file", file) - } - } - } - - return nil -} - -func (s *SQLiteStorage) ForceRotate() error { - s.mu.Lock() - defer s.mu.Unlock() - - slog.Info("Forcing log rotation") - return s.rotateDatabase() -} - -func (s *SQLiteStorage) GetRotationInfo() (map[string]any, error) { - s.mu.RLock() - defer s.mu.RUnlock() - - fileInfo, err := os.Stat(s.dbPath) - if err != nil { - return nil, err - } - - info := map[string]any{ - "currentSize": fileInfo.Size(), - "maxSize": s.rotationCfg.MaxSizeBytes, - "currentAge": time.Since(fileInfo.ModTime()).String(), - "maxAge": s.rotationCfg.MaxAgeHours.String(), - "maxFiles": s.rotationCfg.MaxFiles, - "checkInterval": s.rotationCfg.CheckIntervalMinutes.String(), - "archiveDir": s.rotationCfg.ArchiveDir, - } - - return info, nil -} - -func createTables(db *sql.DB) error { - createTableStmt := ` - CREATE TABLE IF NOT EXISTS log_entries ( - id INTEGER PRIMARY KEY AUTOINCREMENT, - service TEXT, - timestamp DATETIME NOT NULL, - type TEXT NOT NULL, - host TEXT NOT NULL, - tool TEXT, - log_level TEXT, - log_message TEXT, - raw TEXT, - priority TEXT, - priority_name TEXT, - unit TEXT, - pid INTEGER, - boot_id TEXT, - machine_id TEXT, - fields TEXT, - service_information TEXT, - system_metrics TEXT, - tool_information TEXT, - created_at DATETIME DEFAULT CURRENT_TIMESTAMP, - exported_at DATETIME - );` - - if _, err := db.Exec(createTableStmt); err != nil { - return err - } - - indexes := []string{ - "CREATE INDEX IF NOT EXISTS idx_timestamp ON log_entries(timestamp);", - "CREATE INDEX IF NOT EXISTS idx_service ON log_entries(service);", - "CREATE INDEX IF NOT EXISTS idx_type ON log_entries(type);", - "CREATE INDEX IF NOT EXISTS idx_tool ON log_entries(tool);", - "CREATE INDEX IF NOT EXISTS idx_log_level ON log_entries(log_level);", - "CREATE INDEX IF NOT EXISTS idx_exported ON log_entries(exported_at);", - "CREATE INDEX IF NOT EXISTS idx_composite ON log_entries(timestamp, type, service);", - } - - for _, index := range indexes { - if _, err := db.Exec(index); err != nil { - return fmt.Errorf("failed to create index: %w", err) - } - } - - return nil -} - -func (s *SQLiteStorage) Store(ctx context.Context, entry *models.LogMessage) error { - return s.StoreBatch(ctx, []models.LogMessage{*entry}) -} - -func (s *SQLiteStorage) StoreBatch(ctx context.Context, entries []models.LogMessage) error { - if len(entries) == 0 { - return nil - } - - s.mu.RLock() - defer s.mu.RUnlock() - - tx, err := s.db.BeginTx(ctx, nil) - if err != nil { - return fmt.Errorf("failed to begin transaction: %w", err) - } - defer tx.Rollback() - - stmt, err := tx.PrepareContext(ctx, ` - INSERT INTO log_entries - (service, timestamp, type, host, tool, log_level, log_message, raw, priority, priority_name, - unit, pid, boot_id, machine_id, fields, service_information, system_metrics, tool_information) - VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)`) - if err != nil { - return fmt.Errorf("failed to prepare statement: %w", err) - } - defer stmt.Close() - - for _, entry := range entries { - fieldsJSON, _ := json.Marshal(entry.Fields) - serviceInfoJSON, _ := json.Marshal(entry.ServiceInformation) - systemMetricsJSON, _ := json.Marshal(entry.SystemMetrics) - toolInfoJSON, _ := json.Marshal(entry.ToolInformation) - - _, err := stmt.ExecContext(ctx, - entry.Service, - entry.Timestamp, - entry.Type, - entry.Host, - entry.Tool, - entry.LogLevel, - entry.LogMessage, - entry.Raw, - entry.Priority, - entry.PriorityName, - entry.Unit, - entry.PID, - entry.BootID, - entry.MachineID, - string(fieldsJSON), - string(serviceInfoJSON), - string(systemMetricsJSON), - string(toolInfoJSON), - ) - if err != nil { - return fmt.Errorf("failed to insert entry: %w", err) - } - } - - return tx.Commit() -} - -func (s *SQLiteStorage) Query(ctx context.Context, query StorageQuery) ([]models.LogMessage, error) { - s.mu.RLock() - defer s.mu.RUnlock() - - sqlQuery := "SELECT service, timestamp, type, host, tool, log_level, log_message, raw, priority, priority_name, unit, pid, boot_id, machine_id, fields, service_information, system_metrics, tool_information FROM log_entries WHERE 1=1" - args := []any{} - argCount := 0 - - if !query.StartTime.IsZero() { - argCount++ - sqlQuery += fmt.Sprintf(" AND timestamp >= ?%d", argCount) - args = append(args, query.StartTime) - } - if !query.EndTime.IsZero() { - argCount++ - sqlQuery += fmt.Sprintf(" AND timestamp <= ?%d", argCount) - args = append(args, query.EndTime) - } - if query.Service != "" { - argCount++ - sqlQuery += fmt.Sprintf(" AND service = ?%d", argCount) - args = append(args, query.Service) - } - if query.Tool != "" { - argCount++ - sqlQuery += fmt.Sprintf(" AND tool = ?%d", argCount) - args = append(args, query.Tool) - } - if query.LogLevel != "" { - argCount++ - sqlQuery += fmt.Sprintf(" AND log_level = ?%d", argCount) - args = append(args, query.LogLevel) - } - if query.Type != "" { - argCount++ - sqlQuery += fmt.Sprintf(" AND type = ?%d", argCount) - args = append(args, query.Type) - } - - if query.OrderBy != "" { - direction := "ASC" - if query.OrderDesc { - direction = "DESC" - } - sqlQuery += fmt.Sprintf(" ORDER BY %s %s", query.OrderBy, direction) - } else { - sqlQuery += " ORDER BY timestamp DESC" - } - - if query.Limit > 0 { - sqlQuery += fmt.Sprintf(" LIMIT %d", query.Limit) - if query.Offset > 0 { - sqlQuery += fmt.Sprintf(" OFFSET %d", query.Offset) - } - } - - rows, err := s.db.QueryContext(ctx, sqlQuery, args...) - if err != nil { - return nil, fmt.Errorf("failed to execute query: %w", err) - } - defer rows.Close() - - var entries []models.LogMessage - for rows.Next() { - var entry models.LogMessage - var fieldsJSON, serviceInfoJSON, systemMetricsJSON, toolInfoJSON string - - err := rows.Scan( - &entry.Service, - &entry.Timestamp, - &entry.Type, - &entry.Host, - &entry.Tool, - &entry.LogLevel, - &entry.LogMessage, - &entry.Raw, - &entry.Priority, - &entry.PriorityName, - &entry.Unit, - &entry.PID, - &entry.BootID, - &entry.MachineID, - &fieldsJSON, - &serviceInfoJSON, - &systemMetricsJSON, - &toolInfoJSON, - ) - if err != nil { - return nil, fmt.Errorf("failed to scan row: %w", err) - } - - if fieldsJSON != "" && fieldsJSON != "null" { - json.Unmarshal([]byte(fieldsJSON), &entry.Fields) - } - if serviceInfoJSON != "" && serviceInfoJSON != "null" { - json.Unmarshal([]byte(serviceInfoJSON), &entry.ServiceInformation) - } - if systemMetricsJSON != "" && systemMetricsJSON != "null" { - json.Unmarshal([]byte(systemMetricsJSON), &entry.SystemMetrics) - } - if toolInfoJSON != "" && toolInfoJSON != "null" { - json.Unmarshal([]byte(toolInfoJSON), &entry.ToolInformation) - } - - entries = append(entries, entry) - } - - return entries, rows.Err() -} - -func (s *SQLiteStorage) MarkAsExported(ctx context.Context, ids []int64) error { - if len(ids) == 0 { - return nil - } - - s.mu.RLock() - defer s.mu.RUnlock() - - tx, err := s.db.BeginTx(ctx, nil) - if err != nil { - return err - } - - placeholders := strings.Repeat("?,", len(ids)) - placeholders = placeholders[:len(placeholders)-1] - - sqlQuery := fmt.Sprintf("UPDATE log_entries SET exported_at = CURRENT_TIMESTAMP WHERE id IN (%s)", placeholders) - - args := make([]any, len(ids)) - for i, id := range ids { - args[i] = id - } - - _, err = tx.ExecContext(ctx, sqlQuery, args...) - if err != nil { - tx.Rollback() - return err - } - - return tx.Commit() -} - -func (s *SQLiteStorage) GetUnexportedEntries(ctx context.Context, limit int) ([]models.LogMessage, error) { - s.mu.RLock() - defer s.mu.RUnlock() - - sqlQuery := `SELECT id, service, timestamp, type, host, tool, log_level, log_message, raw, priority, priority_name, - unit, pid, boot_id, machine_id, fields, service_information, system_metrics, tool_information - FROM log_entries WHERE exported_at IS NULL ORDER BY timestamp ASC` - - if limit > 0 { - sqlQuery += fmt.Sprintf(" LIMIT %d", limit) - } - - rows, err := s.db.QueryContext(ctx, sqlQuery) - if err != nil { - return nil, fmt.Errorf("failed to execute query: %w", err) - } - defer rows.Close() - - var entries []models.LogMessage - for rows.Next() { - var entry models.LogMessage - var id int64 - var fieldsJSON, serviceInfoJSON, systemMetricsJSON, toolInfoJSON string - - err := rows.Scan( - &id, - &entry.Service, - &entry.Timestamp, - &entry.Type, - &entry.Host, - &entry.Tool, - &entry.LogLevel, - &entry.LogMessage, - &entry.Raw, - &entry.Priority, - &entry.PriorityName, - &entry.Unit, - &entry.PID, - &entry.BootID, - &entry.MachineID, - &fieldsJSON, - &serviceInfoJSON, - &systemMetricsJSON, - &toolInfoJSON, - ) - if err != nil { - return nil, fmt.Errorf("failed to scan row: %w", err) - } - - if entry.Fields == nil { - entry.Fields = make(map[string]any) - } - entry.Fields["_internal_id"] = id - - if fieldsJSON != "" && fieldsJSON != "null" { - json.Unmarshal([]byte(fieldsJSON), &entry.Fields) - } - if serviceInfoJSON != "" && serviceInfoJSON != "null" { - json.Unmarshal([]byte(serviceInfoJSON), &entry.ServiceInformation) - } - if systemMetricsJSON != "" && systemMetricsJSON != "null" { - json.Unmarshal([]byte(systemMetricsJSON), &entry.SystemMetrics) - } - if toolInfoJSON != "" && toolInfoJSON != "null" { - json.Unmarshal([]byte(toolInfoJSON), &entry.ToolInformation) - } - - entries = append(entries, entry) - } - - return entries, rows.Err() -} - -func (s *SQLiteStorage) Close() error { - close(s.rotationStop) - s.rotationWg.Wait() - - s.mu.Lock() - defer s.mu.Unlock() - - return s.db.Close() -} diff --git a/log_processor.go b/log_processor.go index ab0fd25..cb11b84 100644 --- a/log_processor.go +++ b/log_processor.go @@ -4,7 +4,7 @@ import ( "context" "log/slog" "time" - "tixel_watch/models" + "watch-tool/models" ) type LogProcessor struct { diff --git a/main.go b/main.go index 9ae138a..5c67513 100644 --- a/main.go +++ b/main.go @@ -8,26 +8,36 @@ import ( "sync" "syscall" "time" - "tixel_watch/models" + "watch-tool/helpers" + "watch-tool/models" + "watch-tool/patterns" ) -var hostname string +var currentHostname string func init() { var err error - hostname, err = os.Hostname() + currentHostname, err = os.Hostname() if err != nil { - hostname = "unknown" + currentHostname = "unknown" + slog.Warn("Could not determine hostname, using fallback", "fallback", currentHostname) } } func main() { - cfg, err := LoadConfigV2() + cfg, err := LoadConfig() if err != nil { - slog.Error("error loading configuration", "error", err) + slog.Error("Startup failed: configuration error", "error", err) os.Exit(1) } - slog.Info("TIXEL System Monitor started") + + slog.Info("System Monitor started", "hostname", currentHostname) + + if err := patterns.GetInstance().Load(cfg.PatternsFile); err != nil { + slog.Error("Startup failed: could not load patterns", "file", cfg.PatternsFile, "error", err) + os.Exit(1) + } + slog.Info("Regex patterns loaded successfully", "file", cfg.PatternsFile) var storage StorageInterface if cfg.LocalStorage.Enable { @@ -46,7 +56,7 @@ func main() { } storage = sqliteStorage defer storage.Close() - slog.Info("SQLite storage with rotation initialized", "path", cfg.LocalStorage.DBPath) + slog.Info("SQLite storage initialized", "path", cfg.LocalStorage.DBPath) } else { slog.Error("Local storage is disabled, but it's required for the new architecture") os.Exit(1) @@ -65,7 +75,7 @@ func main() { exportManager = NewExportManager(storage, exportConfig) if cfg.Elasticsearch.Enabled { - esExporter, err := NewElasticsearchExporterV2(cfg.Elasticsearch) + esExporter, err := NewElasticsearchExporter(cfg.Elasticsearch) if err != nil { slog.Error("failed to create Elasticsearch exporter", "error", err) os.Exit(1) @@ -79,10 +89,6 @@ func main() { exportManager.RegisterExporter("elasticsearch", esExporter) slog.Info("Elasticsearch exporter registered") } - - // Add more exporters here in the future - // exportManager.RegisterExporter("checkmk", checkmkExporter) - // exportManager.RegisterExporter("grafana", grafanaExporter) } logChan := make(chan models.LogMessage, 1000) @@ -92,86 +98,92 @@ func main() { var wg sync.WaitGroup wg.Add(1) - go func() { + helpers.SafeGo(ctx, "LogProcessor", func() { defer wg.Done() processor := NewLogProcessor(storage) processor.Start(ctx, logChan) - }() + }) if exportManager != nil { wg.Add(1) - go func() { + helpers.SafeGo(ctx, "ExportManager", func() { defer wg.Done() exportManager.Start(ctx) - }() + }) } for _, service := range cfg.Services { if !service.Enabled { - slog.Info("Service deactivated, skipping...", "service", service.Name) + slog.Debug("Service deactivated, skipping...", "service", service.Name) continue } wg.Add(1) - go func(s ServiceConfig) { - defer wg.Done() - monitor := NewServiceMonitor(s) - if err := monitor.Start(ctx, logChan); err != nil { - slog.Error("error watching service", "service", s.Name, "error", err) - } - }(service) + srv := service - slog.Info("started watching Service-Log", "service", service.Name) + helpers.SafeGo(ctx, "ServiceMonitor-"+srv.Name, func() { + defer wg.Done() + monitor := NewServiceMonitor(srv, currentHostname) + + if err := monitor.Start(ctx, logChan); err != nil { + slog.Error("Error watching service", "service", srv.Name, "error", err) + } + }) + + slog.Info("Started watching Service-Log", "service", service.Name) } for _, tool := range cfg.Tools { if !tool.Enabled { - slog.Info("Tool is deactivated, skipping...", "tool", tool.Name) + slog.Debug("Tool is deactivated, skipping...", "tool", tool.Name) continue } wg.Add(1) - go func(t ToolConfig) { - defer wg.Done() - monitor := NewFileMonitor(t) - if err := monitor.Start(ctx, logChan); err != nil { - slog.Error("error watching", "tool", t.Name, "error", err) - } - }(tool) + t := tool - slog.Info("started watching logs", "tool", tool.Name, "file", tool.LogFile) + helpers.SafeGo(ctx, "FileMonitor-"+t.Name, func() { + defer wg.Done() + + monitor := NewFileMonitor(t, currentHostname) + + if err := monitor.Start(ctx, logChan); err != nil { + slog.Error("Error watching tool", "tool", t.Name, "error", err) + } + }) + + slog.Info("Started watching logs", "tool", tool.Name, "file", tool.LogFile) } if cfg.SystemMetrics.Enabled { wg.Add(1) - go func() { + helpers.SafeGo(ctx, "SystemMetrics", func() { defer wg.Done() - collector := NewSystemMetricsCollector(cfg.SystemMetrics, cfg.PollIntervalSeconds) - collector.StartV2(ctx, storage, logChan) - }() + collector := NewSystemMetricsCollector(cfg.SystemMetrics, cfg.PollIntervalSeconds, currentHostname) + collector.Start(ctx, storage, logChan) + }) slog.Info("Started collecting System-Metrics") } if cfg.WebService.Enabled { wg.Add(1) - go func() { + helpers.SafeGo(ctx, "WebService", func() { defer wg.Done() - webService := NewWebServiceV2(cfg, storage) + webService := NewWebService(cfg, storage) if err := webService.Start(ctx); err != nil { - slog.Error("web service error", "error", err) + slog.Error("Web service error", "error", err) } - }() + }) slog.Info("Web service started", "host", cfg.WebService.Host, "port", cfg.WebService.Port) } sigCh := make(chan os.Signal, 1) signal.Notify(sigCh, os.Interrupt, syscall.SIGTERM) - <-sigCh - slog.Info("Shutdown-Signal received, stopping threads...") + s := <-sigCh + slog.Info("Shutdown signal received, stopping threads...", "signal", s) cancel() - close(logChan) done := make(chan struct{}) go func() { @@ -181,9 +193,11 @@ func main() { select { case <-done: - slog.Info("All threads closed") + close(logChan) + slog.Info("All threads closed gracefully") case <-time.After(10 * time.Second): - slog.Info("Shutdown-Timeout reached, force quitting") + slog.Error("Shutdown timeout reached, force quitting") + os.Exit(1) } slog.Info("Program stopped") diff --git a/parser/am_parser.go b/parser/am_parser.go index cfd9950..0b8c813 100644 --- a/parser/am_parser.go +++ b/parser/am_parser.go @@ -5,8 +5,8 @@ import ( "regexp" "strings" "time" - "tixel_watch/helpers" - "tixel_watch/models" + "watch-tool/helpers" + "watch-tool/models" ) var ( diff --git a/parser/default_parser.go b/parser/default_parser.go index 664fc52..fcf8a80 100644 --- a/parser/default_parser.go +++ b/parser/default_parser.go @@ -3,12 +3,13 @@ package parser import ( "strings" "time" - "tixel_watch/models" + "watch-tool/models" ) type DefaultParser struct { - Service string - Tool string + Service string + Tool string + Hostname string } func (d *DefaultParser) Parse(line string) (models.LogMessage, error) { diff --git a/parser/factory.go b/parser/factory.go index fb6bc15..4293b0f 100644 --- a/parser/factory.go +++ b/parser/factory.go @@ -1,27 +1,10 @@ package parser -func New(serviceName, logType string) (Parser, error) { +func New(serviceName, logType, hostname string) (Parser, error) { switch logType { - case "custom": - switch serviceName { - case "tixstream": - return &TSParser{}, nil - case "transfer-job-manager": - return &TJMParser{}, nil - case "access-manager": - return &arser{}, nil - case "tixel-control-center": - return &TCCParser{}, nil - case "nginx": - return &NginxParser{}, nil - case "nginx-tjm": - return &NginxTJMLogParser{ToolName: serviceName}, nil - default: - return &DefaultParser{Service: serviceName}, nil - } case "json": return &JSONParser{}, nil default: - return &DefaultParser{Service: serviceName}, nil + return NewGenericParser(serviceName, hostname), nil } } diff --git a/parser/generic_parser.go b/parser/generic_parser.go new file mode 100644 index 0000000..58c94c9 --- /dev/null +++ b/parser/generic_parser.go @@ -0,0 +1,296 @@ +// package parser + +// import ( +// "fmt" +// "strconv" +// "strings" +// "time" +// "watch-tool/models" +// "watch-tool/patterns" +// ) + +// type GenericParser struct { +// ServiceName string +// Hostname string +// Extractors []patterns.CompiledExtractor +// CommonExt []patterns.CompiledExtractor +// } + +// func NewGenericParser(serviceName, hostname string) *GenericParser { +// repo := patterns.GetInstance() +// return &GenericParser{ +// ServiceName: serviceName, +// Hostname: hostname, +// Extractors: repo.GetExtractors(serviceName), +// CommonExt: repo.GetExtractors("common"), +// } +// } + +// func (p *GenericParser) Parse(line string) (models.LogMessage, error) { +// entry := models.LogMessage{ +// Service: p.ServiceName, +// Host: p.Hostname, +// Timestamp: time.Now(), +// Raw: line, +// Fields: make(map[string]any), +// } + +// // 1. Common Extractors laufen lassen (z.B. Syslog Header entfernen/parsen) +// // Wir nutzen eine temporäre Variable für den Rest-String, falls Header entfernt werden soll +// currentLine := line + +// // Hinweis: Hier könnte man Syslog-Logik generisch einbauen. +// // Fürs Erste wenden wir Pattern einfach auf die Zeile an. + +// // 2. Service Extractors anwenden +// // Wir probieren ALLE Extractors, um maximale Informationen zu gewinnen. +// // Das simuliert die Logik deiner alten Parser (erst Header, dann Details). +// allExtractors := append(p.CommonExt, p.Extractors...) + +// for _, ext := range allExtractors { +// matches := ext.Pattern.FindStringSubmatch(currentLine) +// if matches == nil { +// continue +// } + +// subexpNames := ext.Pattern.SubexpNames() +// for i, matchValue := range matches { +// if i == 0 || matchValue == "" { +// continue +// } + +// groupName := subexpNames[i] +// if groupName == "" { +// continue +// } + +// targetType := ext.Fields[groupName] +// parsedValue, err := convertType(matchValue, targetType) +// if err == nil { +// switch groupName { +// case "timestamp": +// if t, ok := parsedValue.(time.Time); ok { +// entry.Timestamp = t +// } +// case "log_level": +// entry.LogLevel = fmt.Sprintf("%v", parsedValue) +// case "message": +// entry.LogMessage = fmt.Sprintf("%v", parsedValue) +// default: +// entry.Fields[groupName] = parsedValue +// } +// } +// } +// } + +// if entry.LogMessage == "" { +// entry.LogMessage = strings.TrimSpace(line) +// } + +// return entry, nil +// } + +// func convertType(value, typeDef string) (any, error) { +// if strings.HasPrefix(typeDef, "int") { +// return strconv.Atoi(value) +// } +// if strings.HasPrefix(typeDef, "float") { +// return strconv.ParseFloat(value, 64) +// } +// if after, ok := strings.CutPrefix(typeDef, "time:"); ok { +// layout := after +// // Workaround für Syslog (Jahr fehlt oft), hier vereinfacht: +// if layout == "Jan 02 15:04:05" { +// t, err := time.Parse(layout, value) +// if err == nil { +// return t.AddDate(time.Now().Year(), 0, 0), nil +// } +// return t, err +// } +// return time.Parse(layout, value) +// } +// // Default: String +// return value, nil +// } +package parser + +import ( + "fmt" + "log/slog" + "strconv" + "strings" + "time" + "watch-tool/models" + "watch-tool/patterns" +) + +type GenericParser struct { + ServiceName string + Hostname string + Extractors []patterns.CompiledExtractor + CommonExt []patterns.CompiledExtractor +} + +func NewGenericParser(serviceName, hostname string) *GenericParser { + repo := patterns.GetInstance() + + var svcExt, commonExt []patterns.CompiledExtractor + if repo != nil { + svcExt = repo.GetExtractors(serviceName) + commonExt = repo.GetExtractors("common") + } else { + slog.Error("CRITICAL: Pattern Repository is nil. Parser will not work correctly.") + } + + return &GenericParser{ + ServiceName: serviceName, + Hostname: hostname, + Extractors: svcExt, + CommonExt: commonExt, + } +} + +func (p *GenericParser) Parse(line string) (models.LogMessage, error) { + entry := models.LogMessage{ + Service: p.ServiceName, + Host: p.Hostname, + Timestamp: time.Now(), + Raw: line, + Fields: make(map[string]any), + Type: "log_entry", + } + + trimmedLine := strings.TrimSpace(line) + if trimmedLine == "" { + return entry, nil + } + + allExtractors := append(p.CommonExt, p.Extractors...) + + matchedAny := false + + for _, ext := range allExtractors { + matches := ext.Pattern.FindStringSubmatch(trimmedLine) + if matches == nil { + continue + } + matchedAny = true + + subexpNames := ext.Pattern.SubexpNames() + for i, matchValue := range matches { + if i == 0 { + continue + } + + groupName := subexpNames[i] + if groupName == "" { + continue + } + + cleanValue := strings.TrimSpace(matchValue) + + targetType := ext.Fields[groupName] + parsedValue := p.safeConvert(cleanValue, targetType) + + p.mapField(&entry, groupName, parsedValue) + } + } + + if !matchedAny { + entry.LogMessage = trimmedLine + entry.Fields["_parse_status"] = "failed" + } else if entry.LogMessage == "" { + entry.LogMessage = trimmedLine + } + + return entry, nil +} + +func (p *GenericParser) safeConvert(value, typeDef string) any { + if value == "" || value == "-" { + if strings.HasPrefix(typeDef, "int") || strings.HasPrefix(typeDef, "float") { + return 0 + } + return value + } + + var err error + var result any + + switch { + case strings.HasPrefix(typeDef, "int"): + var i int + i, err = strconv.Atoi(value) + result = i + + case strings.HasPrefix(typeDef, "float"): + var f float64 + f, err = strconv.ParseFloat(value, 64) + result = f + + case strings.HasPrefix(typeDef, "time:"): + layout := strings.TrimPrefix(typeDef, "time:") + result, err = p.parseTimeRobust(value, layout) + + case typeDef == "bool": + var b bool + b, err = strconv.ParseBool(value) + result = b + + default: + return value + } + + if err != nil { + return value + } + + return result +} + +func (p *GenericParser) parseTimeRobust(value, layout string) (time.Time, error) { + if layout == "Jan 02 15:04:05" { + t, err := time.Parse(layout, value) + if err != nil { + return time.Time{}, err + } + now := time.Now() + year := now.Year() + if t.Month() > now.Month() { + year-- + } + return t.AddDate(year, 0, 0), nil + } + + return time.Parse(layout, value) +} + +func (p *GenericParser) mapField(entry *models.LogMessage, key string, value any) { + switch key { + case "timestamp", "time": + if t, ok := value.(time.Time); ok { + entry.Timestamp = t + } + case "log_level", "level": + entry.LogLevel = fmt.Sprintf("%v", value) + case "message", "msg": + entry.LogMessage = fmt.Sprintf("%v", value) + case "host", "hostname": + entry.Host = fmt.Sprintf("%v", value) + case "service": + entry.Service = fmt.Sprintf("%v", value) + case "pid": + if v, ok := value.(int); ok { + entry.PID = v + } else if vStr, ok := value.(string); ok { + if pid, err := strconv.Atoi(vStr); err == nil { + entry.PID = pid + } + } + // Mapping auf ServiceInformation Felder (Optional, falls nötig) + // case "transfer_id": ... + + default: + entry.Fields[key] = value + } +} diff --git a/parser/json_parser.go b/parser/json_parser.go index fd3527e..d83d068 100644 --- a/parser/json_parser.go +++ b/parser/json_parser.go @@ -3,7 +3,7 @@ package parser import ( "encoding/json" "log/slog" - "tixel_watch/models" + "watch-tool/models" ) type JSONParser struct{} diff --git a/parser/nginx_parser.go b/parser/nginx_parser.go index ee4da40..0bec84b 100644 --- a/parser/nginx_parser.go +++ b/parser/nginx_parser.go @@ -5,7 +5,7 @@ import ( "regexp" "strconv" "strings" - "tixel_watch/models" + "watch-tool/models" ) var ( diff --git a/parser/nginx_tjm_parser.go b/parser/nginx_tjm_parser.go index d5f4f5e..2662a49 100644 --- a/parser/nginx_tjm_parser.go +++ b/parser/nginx_tjm_parser.go @@ -4,12 +4,13 @@ import ( "log/slog" "strconv" "strings" - "tixel_watch/helpers" - "tixel_watch/models" + "watch-tool/helpers" + "watch-tool/models" ) type NginxTJMLogParser struct { ToolName string + Hostname string } func (p *NginxTJMLogParser) Parse(line string) (models.LogMessage, error) { @@ -18,11 +19,7 @@ func (p *NginxTJMLogParser) Parse(line string) (models.LogMessage, error) { Tool: p.ToolName, Raw: line, } - hostname, err := helpers.GetHostname() - if err != nil { - return entry, err - } - entry.Host = hostname + entry.Host = p.Hostname entry = p.parseNginxTJM(entry) return entry, nil } diff --git a/parser/parser.go b/parser/parser.go index 480c741..53d3c63 100644 --- a/parser/parser.go +++ b/parser/parser.go @@ -1,11 +1,9 @@ package parser import ( - "tixel_watch/models" + "watch-tool/models" ) type Parser interface { - //TODO: Change parsers to return an error as well Parse(line string) (models.LogMessage, error) - // Parse(line string) models.LogMessage } diff --git a/parser/regex_parser.go b/parser/regex_parser.go index d12a643..b4c6a82 100644 --- a/parser/regex_parser.go +++ b/parser/regex_parser.go @@ -1,29 +1,23 @@ package parser import ( - "log/slog" "regexp" "strings" - "tixel_watch/helpers" - "tixel_watch/models" + "watch-tool/models" ) type RegexLogParser struct { Pattern *regexp.Regexp Fields map[string]string Toolname string + Hostname string } func (p *RegexLogParser) Parse(line string) (models.LogMessage, error) { entry := models.LogMessage{Type: "log_entry"} entry.Tool = p.Toolname entry.Raw = line - hostname, err := helpers.GetHostname() - if err != nil { - slog.Warn("cannot get hostname") - return entry, err - } - entry.Host = hostname + entry.Host = p.Hostname fields := p.parseWithPattern(line) if fields != nil { diff --git a/parser/tcc_parser.go b/parser/tcc_parser.go index 2c117c9..1cca213 100644 --- a/parser/tcc_parser.go +++ b/parser/tcc_parser.go @@ -5,8 +5,8 @@ import ( "regexp" "strings" "time" - "tixel_watch/helpers" - "tixel_watch/models" + "watch-tool/helpers" + "watch-tool/models" ) var ( diff --git a/parser/tjm_parser.go b/parser/tjm_parser.go index 850e21c..cbf6498 100644 --- a/parser/tjm_parser.go +++ b/parser/tjm_parser.go @@ -4,8 +4,8 @@ import ( "log/slog" "regexp" "strings" - "tixel_watch/helpers" - "tixel_watch/models" + "watch-tool/helpers" + "watch-tool/models" ) var ( diff --git a/parser/ts_parser.go b/parser/ts_parser.go index 6518fcb..b8c1525 100644 --- a/parser/ts_parser.go +++ b/parser/ts_parser.go @@ -5,8 +5,8 @@ import ( "regexp" "strconv" "strings" - "tixel_watch/helpers" - "tixel_watch/models" + "watch-tool/helpers" + "watch-tool/models" ) var ( diff --git a/patterns/repository.go b/patterns/repository.go new file mode 100644 index 0000000..f8b8fd4 --- /dev/null +++ b/patterns/repository.go @@ -0,0 +1,179 @@ +// package patterns + +// import ( +// "fmt" +// "regexp" +// "sync" + +// "gopkg.in/yaml.v3" +// "os" +// ) + +// type PatternConfig struct { +// Patterns map[string]map[string]PatternDefinition `yaml:"patterns"` +// } + +// type PatternDefinition struct { +// Regex string `yaml:"regex"` +// Description string `yaml:"description,omitempty"` +// } + +// type Repository struct { +// compiledPatterns map[string]map[string]*regexp.Regexp +// mu sync.RWMutex +// } + +// var ( +// instance *Repository +// once sync.Once +// ) + +// func GetInstance() *Repository { +// once.Do(func() { +// instance = &Repository{ +// compiledPatterns: make(map[string]map[string]*regexp.Regexp), +// } +// }) +// return instance +// } + +// func (r *Repository) Load(path string) error { +// r.mu.Lock() +// defer r.mu.Unlock() + +// data, err := os.ReadFile(path) +// if err != nil { +// return fmt.Errorf("failed to read pattern config: %w", err) +// } + +// var config PatternConfig +// if err := yaml.Unmarshal(data, &config); err != nil { +// return fmt.Errorf("failed to parse pattern config: %w", err) +// } + +// for service, patterns := range config.Patterns { +// if _, exists := r.compiledPatterns[service]; !exists { +// r.compiledPatterns[service] = make(map[string]*regexp.Regexp) +// } + +// for name, def := range patterns { +// compiled, err := regexp.Compile(def.Regex) +// if err != nil { +// return fmt.Errorf("invalid regex for %s/%s: %w", service, name, err) +// } +// r.compiledPatterns[service][name] = compiled +// } +// } + +// return nil +// } + +// func (r *Repository) Get(service string, name string) (*regexp.Regexp, error) { +// r.mu.RLock() +// defer r.mu.RUnlock() + +// if svcPatterns, ok := r.compiledPatterns[service]; ok { +// if pattern, ok := svcPatterns[name]; ok { +// return pattern, nil +// } +// } + +// return nil, fmt.Errorf("pattern not found: %s/%s", service, name) +// } + +// func (r *Repository) MustGet(service string, name string) *regexp.Regexp { +// p, err := r.Get(service, name) +// if err != nil { +// panic(err) +// } +// return p +// } +package patterns + +import ( + "fmt" + "os" + "regexp" + "sync" + + "gopkg.in/yaml.v3" +) + +// Struktur der YAML Datei +type Config struct { + Patterns map[string]ServiceConfig `yaml:"patterns"` +} + +type ServiceConfig struct { + Extractors []ExtractorConfig `yaml:"extractors"` +} + +type ExtractorConfig struct { + Name string `yaml:"name"` + Regex string `yaml:"regex"` + Fields map[string]string `yaml:"fields"` // Name -> Typ (int, float, string) +} + +// Interne kompilierte Struktur +type CompiledExtractor struct { + Name string + Pattern *regexp.Regexp + Fields map[string]string +} + +type Repository struct { + services map[string][]CompiledExtractor + mu sync.RWMutex +} + +var ( + instance *Repository + once sync.Once +) + +func GetInstance() *Repository { + once.Do(func() { + instance = &Repository{ + services: make(map[string][]CompiledExtractor), + } + }) + return instance +} + +func (r *Repository) Load(path string) error { + r.mu.Lock() + defer r.mu.Unlock() + + data, err := os.ReadFile(path) + if err != nil { + return fmt.Errorf("failed to read patterns file: %w", err) + } + + var cfg Config + if err := yaml.Unmarshal(data, &cfg); err != nil { + return fmt.Errorf("failed to parse yaml: %w", err) + } + + for service, svcCfg := range cfg.Patterns { + var compiledList []CompiledExtractor + for _, ext := range svcCfg.Extractors { + re, err := regexp.Compile(ext.Regex) + if err != nil { + return fmt.Errorf("invalid regex in service %s extractor %s: %w", service, ext.Name, err) + } + compiledList = append(compiledList, CompiledExtractor{ + Name: ext.Name, + Pattern: re, + Fields: ext.Fields, + }) + } + r.services[service] = compiledList + } + return nil +} + +func (r *Repository) GetExtractors(service string) []CompiledExtractor { + r.mu.RLock() + defer r.mu.RUnlock() + return r.services[service] +} diff --git a/service_monitor.go b/service_monitor.go index 61a7021..2320130 100644 --- a/service_monitor.go +++ b/service_monitor.go @@ -11,17 +11,19 @@ import ( "strconv" "strings" "time" - "tixel_watch/models" - "tixel_watch/parser" + "watch-tool/models" + "watch-tool/parser" ) type ServiceMonitor struct { - config ServiceConfig + config ServiceConfig + hostname string } -func NewServiceMonitor(config ServiceConfig) *ServiceMonitor { +func NewServiceMonitor(config ServiceConfig, hostname string) *ServiceMonitor { return &ServiceMonitor{ - config: config, + config: config, + hostname: hostname, } } @@ -50,7 +52,7 @@ func (sm *ServiceMonitor) Start(ctx context.Context, out chan<- models.LogMessag } }() - parser := NewJournalEntryParser(sm.config.Name, sm.config.Service) + parser := NewJournalEntryParser(sm.config.Name, sm.config.Service, sm.hostname) for scanner.Scan() { select { @@ -109,12 +111,14 @@ func (sm *ServiceMonitor) buildJournalctlArgs() []string { type JournalEntryParser struct { serviceName string unitName string + hostname string } -func NewJournalEntryParser(serviceName, unitName string) *JournalEntryParser { +func NewJournalEntryParser(serviceName, unitName, hostname string) *JournalEntryParser { return &JournalEntryParser{ serviceName: serviceName, unitName: unitName, + hostname: hostname, } } @@ -124,7 +128,7 @@ func (jep *JournalEntryParser) Parse(jsonLine string) (models.LogMessage, error) return models.LogMessage{}, fmt.Errorf("JSON unmarshal error: %w", err) } - entry := models.NewLogMessage("service_log", hostname) + entry := models.NewLogMessage("service_log", jep.hostname) entry.Service = jep.serviceName entry.Unit = jep.unitName @@ -211,7 +215,7 @@ func (jep *JournalEntryParser) extractSystemdFields(journalData map[string]any, } func (jep *JournalEntryParser) parseServiceSpecific(entry models.LogMessage) models.LogMessage { - logParser, err := parser.New(jep.serviceName, "custom") + logParser, err := parser.New(jep.serviceName, "custom", jep.hostname) if err != nil { slog.Error("cannot get service specific parser") return entry @@ -235,284 +239,3 @@ var ( tsDetailPattern4 = regexp.MustCompile(`out: Start transfer (?P\d+/\d+), src=(?P[^ ]*) dest=(?P[^ ]*) item\[0\]=(?P[^ ]*) count=(?P\d+)`) nginxAccessPattern = regexp.MustCompile(`^(\S+)\s+\S+\s+(\S+)\s+\[([^\]]+)\]\s+"([^"]+)"\s+(\d+)\s+(\d+|-)\s*(?:"([^"]*)"\s+"([^"]*)")?`) ) - -// func parseTixstreamService(entry models.LogMessage) models.LogMessage { -// newEntry := entry -// var baseInfo models.TSTransferInfo - -// matches := tsServicePattern.FindStringSubmatch(newEntry.LogMessage) -// if len(matches) > 0 { -// timestamp := strings.Join(strings.Split(matches[2], " "), "T") -// newEntry.LogLevel = strings.TrimSpace(matches[1]) -// if newEntry.Timestamp.IsZero() { -// timeParsed, err := parseRFC3339WithOptionalZ(timestamp) -// if err != nil { -// slog.Error("cant parse time string", "error", err) -// } -// newEntry.Timestamp = timeParsed -// } -// newEntry.LogMessage = strings.TrimSpace(matches[3]) -// } -// trNameMatch := tsTransferIDPattern.FindStringSubmatch(newEntry.LogMessage) -// var transferID string -// if len(trNameMatch) > 0 { -// transferID = trNameMatch[1] -// newEntry.LogMessage = trNameMatch[2] -// split := strings.Fields(trNameMatch[2]) -// switch split[0] { -// case "in:": -// baseInfo.Direction = "incoming" -// case "out:": -// baseInfo.Direction = "outgoing" -// } -// } - -// msg := strings.ReplaceAll(newEntry.LogMessage, " ", " ") -// parts := strings.Fields(msg) - -// if len(parts) < 5 { -// return newEntry -// } -// tsDetail := tsDetailPattern1.FindStringSubmatch(newEntry.LogMessage) -// if len(tsDetail) > 0 { -// threadInt, _ := strconv.Atoi(strings.Split(tsDetail[1], "/")[0]) -// buffersInt, _ := strconv.Atoi(tsDetail[2]) -// fileCountInt, _ := strconv.Atoi(tsDetail[3]) -// chunkSizeInt, _ := strconv.Atoi(tsDetail[5]) -// streamsInt, _ := strconv.Atoi(tsDetail[6]) -// datarateFloat, _ := strconv.ParseFloat(tsDetail[7], 64) -// fileSizeFloat, _ := strconv.ParseFloat(tsDetail[4], 64) -// baseInfo.Lane = threadInt -// baseInfo.Buffers = buffersInt -// baseInfo.FileCount = fileCountInt -// baseInfo.FileSizeMB = fileSizeFloat -// baseInfo.ChunkSize = chunkSizeInt -// baseInfo.Streams = streamsInt -// baseInfo.TargetDatarate = datarateFloat -// baseInfo.Protocoll = tsDetail[8] -// baseInfo.Dest = tsDetail[9] -// baseInfo.SenderID = tsDetail[10] -// } -// tsDetail = tsDetailPattern2.FindStringSubmatch(newEntry.LogMessage) -// if len(tsDetail) > 0 { -// baseInfo.Target = tsDetail[1] -// } -// tsDetail = tsDetailPattern3.FindStringSubmatch(newEntry.LogMessage) -// if len(tsDetail) > 0 { -// threadInt, _ := strconv.Atoi(strings.Split(tsDetail[1], "/")[0]) -// buffersInt, _ := strconv.Atoi(tsDetail[2]) -// fileCountInt, _ := strconv.Atoi(tsDetail[3]) -// fileSizeFloat, _ := strconv.ParseFloat(tsDetail[4], 64) -// chunkSizeInt, _ := strconv.Atoi(tsDetail[5]) -// streamsInt, _ := strconv.Atoi(tsDetail[6]) -// datarateFloat, _ := strconv.ParseFloat(tsDetail[7], 64) -// baseInfo.Lane = threadInt -// baseInfo.Buffers = buffersInt -// baseInfo.FileCount = fileCountInt -// baseInfo.FileSizeMB = fileSizeFloat -// baseInfo.ChunkSize = chunkSizeInt -// baseInfo.Streams = streamsInt -// baseInfo.TargetDatarate = datarateFloat -// baseInfo.Protocoll = tsDetail[8] -// baseInfo.Src = tsDetail[9] -// baseInfo.Receiver = tsDetail[10] -// } -// tsDetail = tsDetailPattern4.FindStringSubmatch(newEntry.LogMessage) -// if len(tsDetail) > 0 { -// threadInt, _ := strconv.Atoi(strings.Split(tsDetail[1], "/")[0]) -// baseInfo.Lane = threadInt -// baseInfo.Src = tsDetail[2] -// baseInfo.Dest = tsDetail[3] -// } -// if strings.Contains(newEntry.LogMessage, "Transfer start") || strings.Contains(newEntry.LogMessage, "Transfer started,") { -// baseInfo.StartTime = newEntry.Timestamp -// } else { -// baseInfo.StartTime = time.Now() -// } -// if strings.Contains(newEntry.LogMessage, "Transfer stopped local state=finished") { -// baseInfo.EndTime = newEntry.Timestamp -// } else { -// baseInfo.EndTime = baseInfo.StartTime -// } -// if transferID != "" { -// baseInfo.TransferID = transferID -// } else { -// baseInfo.TransferID = "no_transfer_id" -// } -// newEntry.ServiceInformation = baseInfo -// return newEntry -// } - -// func parseTJMService(entry models.LogMessage) models.LogMessage { -// newEntry := entry -// var baseInfo models.TJMTransferInfo - -// logContent := entry.LogMessage -// msg := strings.TrimSpace(logContent) -// msg = strings.ReplaceAll(msg, " ", " ") -// msg = strings.ReplaceAll(msg, "---", "") -// msg = strings.ReplaceAll(msg, " ", " ") -// parts := strings.Fields(msg) -// if len(parts) < 4 { -// return newEntry -// } -// matches := tjmServicePattern.FindStringSubmatch(logContent) -// if len(matches) > 0 { -// timestamp := strings.Join(strings.Split(matches[2], " "), "T") -// newEntry.LogLevel = strings.TrimSpace(matches[1]) -// if newEntry.Timestamp.IsZero() { -// timeParsed, err := parseRFC3339WithOptionalZ(timestamp) -// if err != nil { -// slog.Error("cant parse time string", "error", err) -// } -// newEntry.Timestamp = timeParsed -// } -// newEntry.LogLevel = strings.TrimSpace(matches[2]) -// newEntry.LogMessage = strings.TrimSpace(matches[8]) -// baseInfo = models.TJMTransferInfo{ -// ProcessID: strings.TrimSpace(matches[3]), -// CorrelationID: strings.TrimSpace(matches[4]), -// Username: strings.TrimSpace(matches[5]), -// ThreadID: strings.TrimSpace(matches[6]), -// JavaClass: strings.TrimSpace(matches[7]), -// } -// } else { -// newEntry.LogMessage = logContent -// } -// trNameMatch := tjmTransferNamePattern.FindStringSubmatch(newEntry.LogMessage) -// var transferName string -// var transferID string -// if len(trNameMatch) > 0 { -// transferName = trNameMatch[1] -// newEntry.LogMessage = trNameMatch[2] -// if strings.Contains(trNameMatch[1], "-in") { -// baseInfo.Direction = "incoming" -// } -// if strings.Contains(trNameMatch[1], "-out") { -// baseInfo.Direction = "outgoing" -// } -// } -// trIDMatch := tjmTransferIDPattern1.FindStringSubmatch(newEntry.LogMessage) -// if len(trIDMatch) > 0 { -// transferID = trIDMatch[1] -// } -// trIDMatch = tjmTransferIDPattern2.FindStringSubmatch(newEntry.LogMessage) -// if len(trIDMatch) > 0 { -// transferID = trIDMatch[2] -// } -// if transferID != "" { -// baseInfo.TransferID = transferID -// } else if transferName != "" { -// baseInfo.TransferID = transferName -// } else { -// baseInfo.TransferID = "no_transfer_id" -// } -// baseInfo.StartTime = newEntry.Timestamp -// baseInfo.StartTime = newEntry.Timestamp -// newEntry.ServiceInformation = baseInfo - -// return newEntry -// } - -// func parseAMService(entry models.LogMessage) models.LogMessage { -// newEntry := entry -// logContent := newEntry.LogMessage - -// matches := amServicePattern.FindStringSubmatch(strings.TrimSpace(logContent)) -// if len(matches) != 7 { -// return newEntry -// } -// timestampStr := strings.Join(strings.Split(matches[1], " "), "T") -// if newEntry.Timestamp.IsZero() { -// timeParsed, err := parseRFC3339WithOptionalZ(timestampStr) -// if err != nil { -// slog.Error("cant parse time string", "error", err) -// } -// newEntry.Timestamp = timeParsed -// } -// baseInfo := models.AMBaseInfo{ -// ProcessID: matches[3], -// ThreadID: strings.TrimSpace(matches[4]), -// LoggerName: matches[5], -// } -// newEntry.LogLevel = matches[2] -// newEntry.LogMessage = matches[6] -// newEntry.ServiceInformation = baseInfo - -// return newEntry -// } - -// func parseTCCService(entry models.LogMessage) models.LogMessage { -// newEntry := entry -// logContent := newEntry.LogMessage - -// matches := tccServicePattern.FindStringSubmatch(logContent) -// if len(matches) != 7 { -// return newEntry -// } -// timestampStr := strings.Join(strings.Split(matches[1], " "), "T") -// if newEntry.Timestamp.IsZero() { -// timeParsed, err := parseRFC3339WithOptionalZ(timestampStr) -// if err != nil { -// slog.Error("cant parse time string", "error", err) -// } -// newEntry.Timestamp = timeParsed -// } -// baseInfo := models.TCCBaseInfo{ -// ProcessID: matches[3], -// ThreadID: strings.TrimSpace(matches[4]), -// LoggerName: matches[5], -// } -// newEntry.LogLevel = matches[2] -// newEntry.LogMessage = matches[6] -// newEntry.ServiceInformation = baseInfo - -// return newEntry -// } - -// func parseNginxService(entry models.LogMessage) models.LogMessage { -// newEntry := entry - -// matches := nginxAccessPattern.FindStringSubmatch(strings.TrimSpace(entry.LogMessage)) -// if len(matches) < 7 { -// return newEntry -// } -// statusCode, err := strconv.ParseInt(matches[5], 10, 64) -// if err != nil { -// slog.Error("cant parse statuscode", "error", err) -// } -// bytesSend, err := strconv.ParseInt(matches[6], 10, 64) -// if err != nil { -// slog.Error("cant parse bytessend", "error", err) -// } -// baseInfo := models.NGinXBaseInfo{ -// ClientIP: matches[1], -// RemoteUser: matches[2], -// Request: matches[4], -// StatusCode: int(statusCode), -// BytesSend: int(bytesSend), -// } - -// if len(matches) > 7 && matches[7] != "" { -// baseInfo.Referer = matches[7] -// } -// if len(matches) > 8 && matches[8] != "" { -// baseInfo.UserAgent = matches[8] -// } - -// if requestParts := strings.Fields(matches[4]); len(requestParts) >= 3 { -// baseInfo.HTTPMethod = requestParts[0] -// baseInfo.RequestURI = requestParts[1] -// baseInfo.HTTPVersion = requestParts[2] -// } -// newEntry.ServiceInformation = baseInfo - -// return newEntry -// } - -// func parseRFC3339WithOptionalZ(timeStr string) (time.Time, error) { -// if !strings.HasSuffix(timeStr, "Z") && !strings.ContainsAny(timeStr[len(timeStr)-6:], "+-") { -// timeStr += "Z" -// } -// return time.Parse(time.RFC3339Nano, timeStr) -// } diff --git a/storage_interface.go b/storage_interface.go index b7cfa51..282ed63 100644 --- a/storage_interface.go +++ b/storage_interface.go @@ -3,7 +3,7 @@ package main import ( "context" "time" - "tixel_watch/models" + "watch-tool/models" ) type StorageInterface interface { diff --git a/system_metrics.go b/system_metrics.go index e5e8eda..79a6d6e 100644 --- a/system_metrics.go +++ b/system_metrics.go @@ -12,9 +12,8 @@ import ( "strings" "syscall" "time" - "tixel_watch/models" + "watch-tool/models" - "github.com/elastic/go-elasticsearch/v8" "github.com/shirou/gopsutil/cpu" "github.com/shirou/gopsutil/disk" "github.com/shirou/gopsutil/host" @@ -31,24 +30,24 @@ type SystemMetricsCollector struct { lastNetworkStats map[string]models.NetworkStat lastDiskStats map[string]models.DiskIOStat lastMeasureTime time.Time + hostname string } -func NewSystemMetricsCollector(config SystemMetrics, pollInterval int) *SystemMetricsCollector { +func NewSystemMetricsCollector(config SystemMetrics, pollInterval int, hostname string) *SystemMetricsCollector { return &SystemMetricsCollector{ config: config, pollInterval: pollInterval, lastNetworkStats: make(map[string]models.NetworkStat), lastDiskStats: make(map[string]models.DiskIOStat), lastMeasureTime: time.Now(), + hostname: hostname, } } -func (smc *SystemMetricsCollector) Start(ctx context.Context, es *elasticsearch.Client, baseIndex string) { +func (smc *SystemMetricsCollector) Start(ctx context.Context, storage StorageInterface, logChan chan<- models.LogMessage) { ticker := time.NewTicker(time.Duration(smc.pollInterval) * time.Second) defer ticker.Stop() - sender := NewElasticsearchSender(es) - for { select { case <-ctx.Done(): @@ -61,15 +60,23 @@ func (smc *SystemMetricsCollector) Start(ctx context.Context, es *elasticsearch. continue } - if err := sender.SendSystemMetrics(baseIndex, metrics); err != nil { - slog.Error("error sending system metrics", "error", err) + entry := models.NewLogMessage("system_metrics", smc.hostname) + entry.Service = "system-metrics" + entry.LogLevel = "Info" + entry.SystemMetrics = metrics + + select { + case logChan <- entry: + case <-ctx.Done(): + return + default: + slog.Warn("Log channel is full, system metrics dropped") } } } } - func (smc *SystemMetricsCollector) collectMetrics() (models.SystemResources, error) { - result := models.NewSystemResources(hostname) + result := models.NewSystemResources(smc.hostname) var err error diff --git a/system_metricsV2.go b/system_metricsV2.go deleted file mode 100644 index ddedb29..0000000 --- a/system_metricsV2.go +++ /dev/null @@ -1,40 +0,0 @@ -package main - -import ( - "context" - "log/slog" - "time" - "tixel_watch/models" -) - -func (smc *SystemMetricsCollector) StartV2(ctx context.Context, storage StorageInterface, logChan chan<- models.LogMessage) { - ticker := time.NewTicker(time.Duration(smc.pollInterval) * time.Second) - defer ticker.Stop() - - for { - select { - case <-ctx.Done(): - slog.Info("System metrics collector stopped") - return - case <-ticker.C: - metrics, err := smc.collectMetrics() - if err != nil { - slog.Error("error collecting system metrics", "error", err) - continue - } - - entry := models.NewLogMessage("system_metrics", hostname) - entry.Service = "system-metrics" - entry.LogLevel = "Info" - entry.SystemMetrics = metrics - - select { - case logChan <- entry: - case <-ctx.Done(): - return - default: - slog.Warn("Log channel is full, system metrics dropped") - } - } - } -} diff --git a/web_service.go b/web_service.go index 8669489..64ced15 100644 --- a/web_service.go +++ b/web_service.go @@ -7,35 +7,46 @@ import ( "log/slog" "net/http" "os/exec" + "slices" "strconv" "strings" "time" - - "github.com/elastic/go-elasticsearch/v8" + "watch-tool/models" ) type WebService struct { - server *http.Server - esClient *elasticsearch.Client - config *Config + server *http.Server + storage StorageInterface + config *Config } -func NewWebService(config *Config, esClient *elasticsearch.Client) *WebService { +func LoggingMiddleware(next http.Handler) http.Handler { + return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + slog.Debug("WebService", "Remote-Address", r.RemoteAddr, "Method", r.Method, "Path", r.URL.Path) + next.ServeHTTP(w, r) + }) +} + +func NewWebService(config *Config, storage StorageInterface) *WebService { mux := http.NewServeMux() ws := &WebService{ - esClient: esClient, - config: config, + storage: storage, + config: config, } - mux.HandleFunc("/export", ws.handleExport) - mux.HandleFunc("/health", ws.handleHealth) - mux.HandleFunc("/indices", ws.handleIndices) + mux.HandleFunc("GET /health", ws.handleHealth) + mux.HandleFunc("GET /logs", ws.handleLogs) + mux.HandleFunc("GET /export", ws.handleExport) + mux.HandleFunc("GET /stats", ws.handleStats) + mux.HandleFunc("GET /stats/{service}", ws.handleServiceStats) + + loggedMux := LoggingMiddleware(mux) addr := fmt.Sprintf("%s:%d", config.WebService.Host, config.WebService.Port) ws.server = &http.Server{ Addr: addr, - Handler: mux, + Handler: loggedMux, ReadTimeout: 30 * time.Second, WriteTimeout: 300 * time.Second, IdleTimeout: 60 * time.Second, @@ -64,67 +75,30 @@ func (ws *WebService) Start(ctx context.Context) error { return nil } -func (ws *WebService) handleExport(w http.ResponseWriter, r *http.Request) { - if r.Method != http.MethodGet { - http.Error(w, "Method not allowed", http.StatusMethodNotAllowed) - return - } - - indices := ws.parseIndicesParam(r) - if len(indices) == 0 { - http.Error(w, "No indices specified. Use ?indices=index1,index2", http.StatusBadRequest) - return - } - - size := ws.parseSizeParam(r) - since := ws.parseSinceParam(r) - - slog.Info("Export request received", "indices", indices, "size", size, "since", since) - - w.Header().Set("Content-Type", "application/json") - w.Header().Set("Content-Disposition", "attachment; filename=elasticsearch_export.json") - - exporter := NewElasticsearchExporter(ws.esClient) - if err := exporter.ExportToStream(r.Context(), indices, size, since, w); err != nil { - slog.Error("export error", "error", err) - http.Error(w, fmt.Sprintf("Export error: %v", err), http.StatusInternalServerError) - return - } - - slog.Info("Export completed successfully", "indices", indices) -} - func (ws *WebService) handleHealth(w http.ResponseWriter, r *http.Request) { - if r.Method != http.MethodGet { - http.Error(w, "Method not allowed", http.StatusMethodNotAllowed) - return + + status := map[string]any{ + "status": "healthy", + "timestamp": time.Now(), + "storage": "sqlite", } ctx, cancel := context.WithTimeout(r.Context(), 5*time.Second) defer cancel() - res, err := ws.esClient.Info(ws.esClient.Info.WithContext(ctx)) + _, err := ws.storage.Query(ctx, StorageQuery{ + Limit: 1, + }) + if err != nil { + status["storage_status"] = "unhealthy" + status["storage_error"] = err.Error() w.WriteHeader(http.StatusServiceUnavailable) - json.NewEncoder(w).Encode(map[string]any{ - "status": "unhealthy", - "error": err.Error(), - }) - return + } else { + status["storage_status"] = "healthy" } - res.Body.Close() - - if res.IsError() { - w.WriteHeader(http.StatusServiceUnavailable) - json.NewEncoder(w).Encode(map[string]any{ - "status": "unhealthy", - "error": res.String(), - }) - return - } - statusMap := make(map[string]any) - statusMap["elasticsearch"] = map[string]any{"status": "healthy", "timestamp": time.Now()} + statusMap["tixel-watch"] = status for _, service := range ws.config.Services { statusCommand := []string{"sudo", "systemctl", "status", service.Name, "--no-pager"} @@ -150,89 +124,267 @@ func (ws *WebService) handleHealth(w http.ResponseWriter, r *http.Request) { json.NewEncoder(w).Encode(statusMap) } -func (ws *WebService) handleIndices(w http.ResponseWriter, r *http.Request) { - if r.Method != http.MethodGet { - http.Error(w, "Method not allowed", http.StatusMethodNotAllowed) +func (ws *WebService) handleLogs(w http.ResponseWriter, r *http.Request) { + + query := ws.parseLogsQuery(r) + + ctx, cancel := context.WithTimeout(r.Context(), 30*time.Second) + defer cancel() + + entries, err := ws.storage.Query(ctx, query) + if err != nil { + http.Error(w, fmt.Sprintf("Query error: %v", err), http.StatusInternalServerError) return } + response := map[string]any{ + "entries": entries, + "count": len(entries), + "query": query, + } + + w.Header().Set("Content-Type", "application/json") + json.NewEncoder(w).Encode(response) +} + +func (ws *WebService) handleExport(w http.ResponseWriter, r *http.Request) { + + query := ws.parseLogsQuery(r) + + ctx, cancel := context.WithTimeout(r.Context(), 300*time.Second) + defer cancel() + + entries, err := ws.storage.Query(ctx, query) + if err != nil { + http.Error(w, fmt.Sprintf("Export query error: %v", err), http.StatusInternalServerError) + return + } + + filename := fmt.Sprintf("tixel_export_%s.json", time.Now().Format("20060102_150405")) + w.Header().Set("Content-Type", "application/json") + w.Header().Set("Content-Disposition", fmt.Sprintf("attachment; filename=%s", filename)) + + exportData := map[string]any{ + "export_info": map[string]any{ + "timestamp": time.Now(), + "entry_count": len(entries), + "query": query, + "exported_by": "tixel-watch", + }, + "entries": entries, + } + + if err := json.NewEncoder(w).Encode(exportData); err != nil { + slog.Error("Failed to encode export data", "error", err) + return + } + + slog.Info("Data exported", "count", len(entries), "query", query) +} + +func (ws *WebService) handleStats(w http.ResponseWriter, r *http.Request) { + ctx, cancel := context.WithTimeout(r.Context(), 10*time.Second) defer cancel() - res, err := ws.esClient.Cat.Indices( - ws.esClient.Cat.Indices.WithContext(ctx), - ws.esClient.Cat.Indices.WithFormat("json"), - ) + allEntries, err := ws.storage.Query(ctx, StorageQuery{}) if err != nil { - http.Error(w, fmt.Sprintf("Error fetching indices: %v", err), http.StatusInternalServerError) - return - } - defer res.Body.Close() - - if res.IsError() { - http.Error(w, fmt.Sprintf("Elasticsearch error: %s", res.String()), http.StatusInternalServerError) + http.Error(w, fmt.Sprintf("Stats query error: %v", err), http.StatusInternalServerError) return } - var indices []map[string]any - if err := json.NewDecoder(res.Body).Decode(&indices); err != nil { - http.Error(w, fmt.Sprintf("Error decoding response: %v", err), http.StatusInternalServerError) - return - } - - w.Header().Set("Content-Type", "application/json") - json.NewEncoder(w).Encode(map[string]any{ - "indices": indices, - "count": len(indices), + recentEntries, err := ws.storage.Query(ctx, StorageQuery{ + StartTime: time.Now().Add(-time.Hour), }) -} - -func (ws *WebService) parseIndicesParam(r *http.Request) []string { - indicesParam := r.URL.Query().Get("indices") - if indicesParam == "" { - return nil + if err != nil { + slog.Error("Failed to query recent entries", "error", err) + recentEntries = []models.LogMessage{} } - indices := strings.Split(indicesParam, ",") - var result []string - for _, index := range indices { - index = strings.TrimSpace(index) - if index != "" { - result = append(result, index) + stats := map[string]any{ + "total_entries": len(allEntries), + "recent_entries": len(recentEntries), + "timestamp": time.Now(), + } + + typeCounts := make(map[string]int) + serviceCounts := make(map[string]int) + toolCounts := make(map[string]int) + + for _, entry := range allEntries { + typeCounts[entry.Type]++ + if entry.Service != "" { + serviceCounts[entry.Service]++ + } + if entry.Tool != "" { + toolCounts[entry.Tool]++ } } - return result + stats["by_type"] = typeCounts + stats["by_service"] = serviceCounts + stats["by_tool"] = toolCounts + + w.Header().Set("Content-Type", "application/json") + json.NewEncoder(w).Encode(stats) } -func (ws *WebService) parseSinceParam(r *http.Request) int { - sinceParam := r.URL.Query().Get("since") - if sinceParam == "" { - return 0 +func (ws *WebService) handleServiceStats(w http.ResponseWriter, r *http.Request) { + service := r.PathValue("service") + if service == "" { + http.Error(w, "Service parameter is missing", http.StatusBadRequest) + return } - since, err := strconv.Atoi(sinceParam) + timeRangeStr := r.URL.Query().Get("time_range") + var startTime time.Time + if timeRangeStr == "" { + startTime = time.Now().Add(-24 * time.Hour) + } else { + duration, err := time.ParseDuration(timeRangeStr) + if err != nil { + http.Error(w, fmt.Sprintf("Invalid time_range: %v", err), http.StatusBadRequest) + return + } + startTime = time.Now().Add(-duration) + } + + query := StorageQuery{ + Service: service, + StartTime: startTime, + Limit: 0, + OrderDesc: false, + } + + ctx, cancel := context.WithTimeout(r.Context(), 30*time.Second) + defer cancel() + + entries, err := ws.storage.Query(ctx, query) if err != nil { - return 0 + slog.Error("Failed to query service stats", "service", service, "error", err) + http.Error(w, fmt.Sprintf("Query error: %v", err), http.StatusInternalServerError) + return + } + uniqueTransfersTotal := make(map[string]struct{}) + uniqueTransfersIncoming := make(map[string]struct{}) + uniqueTransfersOutgoing := make(map[string]struct{}) + uniqueTransfersNil := make(map[string]struct{}) + + for _, entry := range entries { + var identifier string + var direction string + + switch v := entry.ServiceInformation.(type) { + case models.TSTransferInfo: + identifier = v.TransferID + direction = v.Direction + case *models.TSTransferInfo: + identifier = v.TransferID + direction = v.Direction + case models.TJMTransferInfo: + identifier = v.TransferID + direction = v.Direction + case *models.TJMTransferInfo: + identifier = v.TransferID + direction = v.Direction + case map[string]any: + identifier, _ = v["transfer_identifier"].(string) + direction, _ = v["direction"].(string) + default: + continue + } + + if identifier != "" { + uniqueTransfersTotal[identifier] = struct{}{} + + switch strings.ToLower(direction) { + case "incoming": + uniqueTransfersIncoming[identifier] = struct{}{} + case "outgoing": + uniqueTransfersOutgoing[identifier] = struct{}{} + default: + uniqueTransfersNil[identifier] = struct{}{} + } + } } - return since + stats := map[string]any{ + "service": service, + "start_time": startTime, + "end_time": time.Now(), + "transfer_counts": map[string]any{ + "total": len(uniqueTransfersTotal), + "incoming": len(uniqueTransfersIncoming), + "outgoing": len(uniqueTransfersOutgoing), + "nil_or_unknown_direction": len(uniqueTransfersNil), + }, + "entry_count": len(entries), + } + + w.Header().Set("Content-Type", "application/json") + json.NewEncoder(w).Encode(stats) } -func (ws *WebService) parseSizeParam(r *http.Request) int { - sizeParam := r.URL.Query().Get("size") - if sizeParam == "" { - return 1000 +func (ws *WebService) parseLogsQuery(r *http.Request) StorageQuery { + query := StorageQuery{ + Limit: 100, + OrderBy: "timestamp", + OrderDesc: true, } - size, err := strconv.Atoi(sizeParam) - if err != nil || size <= 0 { - return 1000 + if limitStr := r.URL.Query().Get("limit"); limitStr != "" { + if limit, err := strconv.Atoi(limitStr); err == nil && limit > 0 { + if limit > 10000 { + limit = 10000 + } + query.Limit = limit + } } - if size > 10000 { - size = 10000 + if offsetStr := r.URL.Query().Get("offset"); offsetStr != "" { + if offset, err := strconv.Atoi(offsetStr); err == nil && offset >= 0 { + query.Offset = offset + } } - return size + if startTime := r.URL.Query().Get("start_time"); startTime != "" { + if t, err := time.Parse(time.RFC3339, startTime); err == nil { + query.StartTime = t + } + } + + if endTime := r.URL.Query().Get("end_time"); endTime != "" { + if t, err := time.Parse(time.RFC3339, endTime); err == nil { + query.EndTime = t + } + } + + if service := r.URL.Query().Get("service"); service != "" { + query.Service = strings.TrimSpace(service) + } + + if tool := r.URL.Query().Get("tool"); tool != "" { + query.Tool = strings.TrimSpace(tool) + } + + if logLevel := r.URL.Query().Get("log_level"); logLevel != "" { + query.LogLevel = strings.TrimSpace(logLevel) + } + + if entryType := r.URL.Query().Get("type"); entryType != "" { + query.Type = strings.TrimSpace(entryType) + } + + if orderBy := r.URL.Query().Get("order_by"); orderBy != "" { + validFields := []string{"timestamp", "service", "tool", "type", "log_level"} + if slices.Contains(validFields, orderBy) { + query.OrderBy = orderBy + } + } + + if orderDesc := r.URL.Query().Get("order_desc"); orderDesc != "" { + query.OrderDesc = orderDesc == "true" + } + + return query } diff --git a/web_serviceV2.go b/web_serviceV2.go deleted file mode 100644 index cf8c64f..0000000 --- a/web_serviceV2.go +++ /dev/null @@ -1,390 +0,0 @@ -package main - -import ( - "context" - "encoding/json" - "fmt" - "log/slog" - "net/http" - "os/exec" - "slices" - "strconv" - "strings" - "time" - "tixel_watch/models" -) - -type WebServiceV2 struct { - server *http.Server - storage StorageInterface - config *Config -} - -func LoggingMiddleware(next http.Handler) http.Handler { - return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - slog.Debug("WebService", "Remote-Address", r.RemoteAddr, "Method", r.Method, "Path", r.URL.Path) - next.ServeHTTP(w, r) - }) -} - -func NewWebServiceV2(config *Config, storage StorageInterface) *WebServiceV2 { - mux := http.NewServeMux() - - ws := &WebServiceV2{ - storage: storage, - config: config, - } - - mux.HandleFunc("GET /health", ws.handleHealth) - mux.HandleFunc("GET /logs", ws.handleLogs) - mux.HandleFunc("GET /export", ws.handleExport) - mux.HandleFunc("GET /stats", ws.handleStats) - mux.HandleFunc("GET /stats/{service}", ws.handleServiceStats) - - loggedMux := LoggingMiddleware(mux) - - addr := fmt.Sprintf("%s:%d", config.WebService.Host, config.WebService.Port) - ws.server = &http.Server{ - Addr: addr, - Handler: loggedMux, - ReadTimeout: 30 * time.Second, - WriteTimeout: 300 * time.Second, - IdleTimeout: 60 * time.Second, - } - - return ws -} - -func (ws *WebServiceV2) Start(ctx context.Context) error { - go func() { - <-ctx.Done() - shutdownCtx, cancel := context.WithTimeout(context.Background(), 10*time.Second) - defer cancel() - - if err := ws.server.Shutdown(shutdownCtx); err != nil { - slog.Error("web service shutdown error", "error", err) - } - }() - - slog.Info("Starting web service", "address", ws.server.Addr) - - if err := ws.server.ListenAndServe(); err != http.ErrServerClosed { - return fmt.Errorf("web service error: %w", err) - } - - return nil -} - -func (ws *WebServiceV2) handleHealth(w http.ResponseWriter, r *http.Request) { - - status := map[string]any{ - "status": "healthy", - "timestamp": time.Now(), - "storage": "sqlite", - } - - ctx, cancel := context.WithTimeout(r.Context(), 5*time.Second) - defer cancel() - - _, err := ws.storage.Query(ctx, StorageQuery{ - Limit: 1, - }) - - if err != nil { - status["storage_status"] = "unhealthy" - status["storage_error"] = err.Error() - w.WriteHeader(http.StatusServiceUnavailable) - } else { - status["storage_status"] = "healthy" - } - statusMap := make(map[string]any) - statusMap["tixel-watch"] = status - - for _, service := range ws.config.Services { - statusCommand := []string{"sudo", "systemctl", "status", service.Name, "--no-pager"} - if service.Enabled { - serviceStatus, err := exec.Command(statusCommand[0], statusCommand[1:]...).Output() - if err != nil { - slog.Error("error executing status command", "error", err) - continue - } - lines := strings.SplitSeq(string(serviceStatus), "\n") - for line := range lines { - if strings.Contains(line, "Active:") { - serviceHealth, found := strings.CutPrefix(strings.TrimSpace(line), "Active:") - if found { - statusMap[service.Name] = map[string]any{"status": serviceHealth, "timestamp": time.Now()} - } - } - } - } - } - - w.Header().Set("Content-Type", "application/json") - json.NewEncoder(w).Encode(statusMap) -} - -func (ws *WebServiceV2) handleLogs(w http.ResponseWriter, r *http.Request) { - - query := ws.parseLogsQuery(r) - - ctx, cancel := context.WithTimeout(r.Context(), 30*time.Second) - defer cancel() - - entries, err := ws.storage.Query(ctx, query) - if err != nil { - http.Error(w, fmt.Sprintf("Query error: %v", err), http.StatusInternalServerError) - return - } - - response := map[string]any{ - "entries": entries, - "count": len(entries), - "query": query, - } - - w.Header().Set("Content-Type", "application/json") - json.NewEncoder(w).Encode(response) -} - -func (ws *WebServiceV2) handleExport(w http.ResponseWriter, r *http.Request) { - - query := ws.parseLogsQuery(r) - - ctx, cancel := context.WithTimeout(r.Context(), 300*time.Second) - defer cancel() - - entries, err := ws.storage.Query(ctx, query) - if err != nil { - http.Error(w, fmt.Sprintf("Export query error: %v", err), http.StatusInternalServerError) - return - } - - filename := fmt.Sprintf("tixel_export_%s.json", time.Now().Format("20060102_150405")) - w.Header().Set("Content-Type", "application/json") - w.Header().Set("Content-Disposition", fmt.Sprintf("attachment; filename=%s", filename)) - - exportData := map[string]any{ - "export_info": map[string]any{ - "timestamp": time.Now(), - "entry_count": len(entries), - "query": query, - "exported_by": "tixel-watch", - }, - "entries": entries, - } - - if err := json.NewEncoder(w).Encode(exportData); err != nil { - slog.Error("Failed to encode export data", "error", err) - return - } - - slog.Info("Data exported", "count", len(entries), "query", query) -} - -func (ws *WebServiceV2) handleStats(w http.ResponseWriter, r *http.Request) { - - ctx, cancel := context.WithTimeout(r.Context(), 10*time.Second) - defer cancel() - - allEntries, err := ws.storage.Query(ctx, StorageQuery{}) - if err != nil { - http.Error(w, fmt.Sprintf("Stats query error: %v", err), http.StatusInternalServerError) - return - } - - recentEntries, err := ws.storage.Query(ctx, StorageQuery{ - StartTime: time.Now().Add(-time.Hour), - }) - if err != nil { - slog.Error("Failed to query recent entries", "error", err) - recentEntries = []models.LogMessage{} - } - - stats := map[string]any{ - "total_entries": len(allEntries), - "recent_entries": len(recentEntries), - "timestamp": time.Now(), - } - - typeCounts := make(map[string]int) - serviceCounts := make(map[string]int) - toolCounts := make(map[string]int) - - for _, entry := range allEntries { - typeCounts[entry.Type]++ - if entry.Service != "" { - serviceCounts[entry.Service]++ - } - if entry.Tool != "" { - toolCounts[entry.Tool]++ - } - } - - stats["by_type"] = typeCounts - stats["by_service"] = serviceCounts - stats["by_tool"] = toolCounts - - w.Header().Set("Content-Type", "application/json") - json.NewEncoder(w).Encode(stats) -} - -func (ws *WebServiceV2) handleServiceStats(w http.ResponseWriter, r *http.Request) { - service := r.PathValue("service") - if service == "" { - http.Error(w, "Service parameter is missing", http.StatusBadRequest) - return - } - - timeRangeStr := r.URL.Query().Get("time_range") - var startTime time.Time - if timeRangeStr == "" { - startTime = time.Now().Add(-24 * time.Hour) - } else { - duration, err := time.ParseDuration(timeRangeStr) - if err != nil { - http.Error(w, fmt.Sprintf("Invalid time_range: %v", err), http.StatusBadRequest) - return - } - startTime = time.Now().Add(-duration) - } - - query := StorageQuery{ - Service: service, - StartTime: startTime, - Limit: 0, - OrderDesc: false, - } - - ctx, cancel := context.WithTimeout(r.Context(), 30*time.Second) - defer cancel() - - entries, err := ws.storage.Query(ctx, query) - if err != nil { - slog.Error("Failed to query service stats", "service", service, "error", err) - http.Error(w, fmt.Sprintf("Query error: %v", err), http.StatusInternalServerError) - return - } - uniqueTransfersTotal := make(map[string]struct{}) - uniqueTransfersIncoming := make(map[string]struct{}) - uniqueTransfersOutgoing := make(map[string]struct{}) - uniqueTransfersNil := make(map[string]struct{}) - - for _, entry := range entries { - var identifier string - var direction string - - switch v := entry.ServiceInformation.(type) { - case models.TSTransferInfo: - identifier = v.TransferID - direction = v.Direction - case *models.TSTransferInfo: - identifier = v.TransferID - direction = v.Direction - case models.TJMTransferInfo: - identifier = v.TransferID - direction = v.Direction - case *models.TJMTransferInfo: - identifier = v.TransferID - direction = v.Direction - case map[string]any: - identifier, _ = v["transfer_identifier"].(string) - direction, _ = v["direction"].(string) - default: - continue - } - - if identifier != "" { - uniqueTransfersTotal[identifier] = struct{}{} - - switch strings.ToLower(direction) { - case "incoming": - uniqueTransfersIncoming[identifier] = struct{}{} - case "outgoing": - uniqueTransfersOutgoing[identifier] = struct{}{} - default: - uniqueTransfersNil[identifier] = struct{}{} - } - } - } - - stats := map[string]any{ - "service": service, - "start_time": startTime, - "end_time": time.Now(), - "transfer_counts": map[string]any{ - "total": len(uniqueTransfersTotal), - "incoming": len(uniqueTransfersIncoming), - "outgoing": len(uniqueTransfersOutgoing), - "nil_or_unknown_direction": len(uniqueTransfersNil), - }, - "entry_count": len(entries), - } - - w.Header().Set("Content-Type", "application/json") - json.NewEncoder(w).Encode(stats) -} - -func (ws *WebServiceV2) parseLogsQuery(r *http.Request) StorageQuery { - query := StorageQuery{ - Limit: 100, - OrderBy: "timestamp", - OrderDesc: true, - } - - if limitStr := r.URL.Query().Get("limit"); limitStr != "" { - if limit, err := strconv.Atoi(limitStr); err == nil && limit > 0 { - if limit > 10000 { - limit = 10000 - } - query.Limit = limit - } - } - - if offsetStr := r.URL.Query().Get("offset"); offsetStr != "" { - if offset, err := strconv.Atoi(offsetStr); err == nil && offset >= 0 { - query.Offset = offset - } - } - - if startTime := r.URL.Query().Get("start_time"); startTime != "" { - if t, err := time.Parse(time.RFC3339, startTime); err == nil { - query.StartTime = t - } - } - - if endTime := r.URL.Query().Get("end_time"); endTime != "" { - if t, err := time.Parse(time.RFC3339, endTime); err == nil { - query.EndTime = t - } - } - - if service := r.URL.Query().Get("service"); service != "" { - query.Service = strings.TrimSpace(service) - } - - if tool := r.URL.Query().Get("tool"); tool != "" { - query.Tool = strings.TrimSpace(tool) - } - - if logLevel := r.URL.Query().Get("log_level"); logLevel != "" { - query.LogLevel = strings.TrimSpace(logLevel) - } - - if entryType := r.URL.Query().Get("type"); entryType != "" { - query.Type = strings.TrimSpace(entryType) - } - - if orderBy := r.URL.Query().Get("order_by"); orderBy != "" { - validFields := []string{"timestamp", "service", "tool", "type", "log_level"} - if slices.Contains(validFields, orderBy) { - query.OrderBy = orderBy - } - } - - if orderDesc := r.URL.Query().Get("order_desc"); orderDesc != "" { - query.OrderDesc = orderDesc == "true" - } - - return query -}