From 366aac9edca36f3fc53e1cdefdc594b941c3ab6d Mon Sep 17 00:00:00 2001 From: Patryk Hegenberg Date: Tue, 23 Sep 2025 14:14:53 +0200 Subject: [PATCH] feat: implement new logic for local storage and exporters --- config.go | 110 ++++++++++++ elasticsearch.go | 3 + elasticsearch_exporterV2.go | 90 ++++++++++ export_manager.go | 169 ++++++++++++++++++ local_storageV2.go | 345 ++++++++++++++++++++++++++++++++++++ log_processor.go | 26 ++- main.go | 79 +++++++-- service_monitor.go | 3 + storage_interface.go | 31 ++++ system_metricsV2.go | 39 ++++ web_serviceV2.go | 308 ++++++++++++++++++++++++++++++++ 11 files changed, 1170 insertions(+), 33 deletions(-) create mode 100644 elasticsearch_exporterV2.go create mode 100644 export_manager.go create mode 100644 local_storageV2.go create mode 100644 storage_interface.go create mode 100644 system_metricsV2.go create mode 100644 web_serviceV2.go diff --git a/config.go b/config.go index 6818b77..065f3f0 100644 --- a/config.go +++ b/config.go @@ -3,10 +3,21 @@ package main import ( "fmt" "log" + "log/slog" + "time" "github.com/spf13/viper" ) +type ExportConfig struct { + Enabled bool `mapstructure:"enabled"` + BatchSize int `mapstructure:"batch_size"` + ExportInterval time.Duration `mapstructure:"export_interval"` + RetryAttempts int `mapstructure:"retry_attempts"` + RetryBackoff time.Duration `mapstructure:"retry_backoff"` + HealthCheckInterval time.Duration `mapstructure:"health_check_interval"` +} + type WebConfig struct { Enabled bool `mapstructure:"enabled"` Port int `mapstructure:"port"` @@ -41,6 +52,7 @@ type ElasticsearchConfig struct { Index string `mapstructure:"index"` Username string `mapstructure:"username"` Password string `mapstructure:"password"` + APIKey string `mapstructure:"api_key"` Timeout int `mapstructure:"timeout"` } @@ -73,6 +85,7 @@ type SystemMetrics struct { type Config struct { Elasticsearch ElasticsearchConfig `mapstructure:"elasticsearch"` LocalStorage LocalStorage `mapstrucutre:"localstorage"` + Export ExportConfig `mapstructure:"export"` Tools []ToolConfig `mapstructure:"tools"` Services []ServiceConfig `mapstructure:"services"` PollIntervalSeconds int `mapstructure:"poll_interval_seconds"` @@ -123,6 +136,29 @@ func setConfigDefaults() { viper.SetDefault("logging.level", "info") } +func setConfigDefaultsV2() { + viper.SetDefault("poll_interval_seconds", 30) + viper.SetDefault("elasticsearch.timeout", 30) + viper.SetDefault("system_metrics.enabled", true) + viper.SetDefault("system_metrics.collect_cpu", true) + viper.SetDefault("system_metrics.collect_memory", true) + viper.SetDefault("system_metrics.collect_disk", true) + viper.SetDefault("system_metrics.collect_network", false) + viper.SetDefault("system_metrics.disk_paths", []string{"/"}) + viper.SetDefault("web_service.enabled", false) + viper.SetDefault("web_service.port", 8080) + viper.SetDefault("web_service.host", "localhost") + viper.SetDefault("logging.level", "info") + viper.SetDefault("export.enabled", true) + viper.SetDefault("export.batch_size", 100) + viper.SetDefault("export.export_interval", "30s") + viper.SetDefault("export.retry_attempts", 3) + viper.SetDefault("export.retry_backoff", "5s") + viper.SetDefault("export.health_check_interval", "60s") + viper.SetDefault("localstorage.enabled", true) + viper.SetDefault("localstorage.db_path", "./tixel_watch.db") +} + func validateConfig(cfg *Config) error { if cfg.Elasticsearch.URL == "" { return fmt.Errorf("elasticsearch.url is required") @@ -145,3 +181,77 @@ func validateConfig(cfg *Config) error { return nil } + +func LoadConfigV2() (*Config, error) { + viper.SetConfigName("config") + viper.AddConfigPath(".") + viper.AddConfigPath("/opt/tixel/tixel-watch/") + viper.SetConfigType("yaml") + + setConfigDefaultsV2() + + if err := viper.ReadInConfig(); err != nil { + return nil, fmt.Errorf("error reading config: %w", err) + } + + var cfg Config + if err := viper.Unmarshal(&cfg); err != nil { + return nil, fmt.Errorf("error parsing config: %w", err) + } + + if err := validateConfigV2(&cfg); err != nil { + return nil, fmt.Errorf("config validation failed: %w", err) + } + + return &cfg, nil +} + +func validateConfigV2(cfg *Config) error { + if !cfg.LocalStorage.Enable { + return fmt.Errorf("local storage must be enabled in the new architecture") + } + + if cfg.LocalStorage.DBPath == "" { + return fmt.Errorf("local storage db_path is required") + } + + if cfg.Export.Enabled && cfg.Elasticsearch.Enabled { + if cfg.Elasticsearch.URL == "" { + return fmt.Errorf("elasticsearch.url is required when elasticsearch export is enabled") + } + if cfg.Elasticsearch.Index == "" { + return fmt.Errorf("elasticsearch.index is required when elasticsearch export is enabled") + } + } + + if cfg.PollIntervalSeconds <= 0 { + slog.Warn("poll_interval_seconds is invalid, setting to 30", "value", cfg.PollIntervalSeconds) + cfg.PollIntervalSeconds = 30 + } + + if cfg.Export.Enabled { + if cfg.Export.BatchSize <= 0 { + cfg.Export.BatchSize = 100 + } + if cfg.Export.ExportInterval <= 0 { + cfg.Export.ExportInterval = 30 * time.Second + } + if cfg.Export.RetryAttempts < 0 { + cfg.Export.RetryAttempts = 3 + } + if cfg.Export.RetryBackoff <= 0 { + cfg.Export.RetryBackoff = 5 * time.Second + } + if cfg.Export.HealthCheckInterval <= 0 { + cfg.Export.HealthCheckInterval = 60 * time.Second + } + } + + for i := range cfg.Tools { + if cfg.Tools[i].BufferSize <= 0 { + cfg.Tools[i].BufferSize = 100 + } + } + + return nil +} diff --git a/elasticsearch.go b/elasticsearch.go index 8ecc079..b8a7b11 100644 --- a/elasticsearch.go +++ b/elasticsearch.go @@ -20,6 +20,9 @@ func NewElasticsearchClient(config ElasticsearchConfig) (*elasticsearch.Client, esConfig.Username = config.Username esConfig.Password = config.Password } + if config.APIKey != "" { + esConfig.APIKey = config.APIKey + } client, err := elasticsearch.NewClient(esConfig) if err != nil { diff --git a/elasticsearch_exporterV2.go b/elasticsearch_exporterV2.go new file mode 100644 index 0000000..537c1f7 --- /dev/null +++ b/elasticsearch_exporterV2.go @@ -0,0 +1,90 @@ +package main + +import ( + "context" + "encoding/json" + "fmt" + "log/slog" + "strings" + "time" + + "github.com/elastic/go-elasticsearch/v8" +) + +type ElasticsearchExporterV2 struct { + client *elasticsearch.Client + config ElasticsearchConfig +} + +func NewElasticsearchExporterV2(config ElasticsearchConfig) (*ElasticsearchExporterV2, error) { + client, err := NewElasticsearchClient(config) + if err != nil { + return nil, fmt.Errorf("failed to create Elasticsearch client: %w", err) + } + + return &ElasticsearchExporterV2{ + client: client, + config: config, + }, nil +} + +func (e *ElasticsearchExporterV2) Export(ctx context.Context, entries []LogEntry) error { + if len(entries) == 0 { + return nil + } + + var body strings.Builder + for _, entry := range entries { + indexName := e.config.Index + + indexLine := fmt.Sprintf(`{"index":{"_index":"%s"}}`, indexName) + body.WriteString(indexLine) + body.WriteString("\n") + + data, err := json.Marshal(entry) + if err != nil { + slog.Error("error marshalling JSON", "error", err) + continue + } + body.WriteString(string(data)) + body.WriteString("\n") + } + + timeout := time.Duration(e.config.Timeout) * time.Second + ctx, cancel := context.WithTimeout(ctx, timeout) + defer cancel() + + res, err := e.client.Bulk( + strings.NewReader(body.String()), + e.client.Bulk.WithContext(ctx), + ) + if err != nil { + return fmt.Errorf("bulk request error: %w", err) + } + defer res.Body.Close() + + if res.IsError() { + return fmt.Errorf("bulk request failed: %s", res.String()) + } + + slog.Debug("Batch successfully exported to Elasticsearch", "count", len(entries)) + return nil +} + +func (e *ElasticsearchExporterV2) HealthCheck(ctx context.Context) error { + timeout := time.Duration(e.config.Timeout) * time.Second + ctx, cancel := context.WithTimeout(ctx, timeout) + defer cancel() + + res, err := e.client.Info(e.client.Info.WithContext(ctx)) + if err != nil { + return fmt.Errorf("health check failed: %w", err) + } + defer res.Body.Close() + + if res.IsError() { + return fmt.Errorf("health check failed: %s", res.String()) + } + + return nil +} diff --git a/export_manager.go b/export_manager.go new file mode 100644 index 0000000..a445325 --- /dev/null +++ b/export_manager.go @@ -0,0 +1,169 @@ +package main + +import ( + "context" + "fmt" + "log/slog" + "maps" + "sync" + "time" +) + +type ExportManager struct { + storage StorageInterface + exporters map[string]ExporterInterface + config ExportManagerConfig + mu sync.RWMutex +} + +type ExportManagerConfig struct { + BatchSize int + ExportInterval time.Duration + RetryAttempts int + RetryBackoff time.Duration + HealthCheckInterval time.Duration +} + +func NewExportManager(storage StorageInterface, config ExportManagerConfig) *ExportManager { + return &ExportManager{ + storage: storage, + exporters: make(map[string]ExporterInterface), + config: config, + } +} + +func (em *ExportManager) RegisterExporter(name string, exporter ExporterInterface) { + em.mu.Lock() + defer em.mu.Unlock() + em.exporters[name] = exporter + slog.Info("Exporter registered", "name", name) +} + +func (em *ExportManager) UnregisterExporter(name string) { + em.mu.Lock() + defer em.mu.Unlock() + delete(em.exporters, name) + slog.Info("Exporter unregistered", "name", name) +} + +func (em *ExportManager) Start(ctx context.Context) { + exportTicker := time.NewTicker(em.config.ExportInterval) + defer exportTicker.Stop() + + healthTicker := time.NewTicker(em.config.HealthCheckInterval) + defer healthTicker.Stop() + + slog.Info("Export manager started", + "batch_size", em.config.BatchSize, + "export_interval", em.config.ExportInterval, + ) + + for { + select { + case <-ctx.Done(): + slog.Info("Export manager stopped") + return + case <-exportTicker.C: + em.exportBatch(ctx) + case <-healthTicker.C: + em.performHealthChecks(ctx) + } + } +} + +func (em *ExportManager) exportBatch(ctx context.Context) { + entries, err := em.storage.(*SQLiteStorage).GetUnexportedEntries(ctx, em.config.BatchSize) + if err != nil { + slog.Error("Failed to get unexported entries", "error", err) + return + } + + if len(entries) == 0 { + return + } + + em.mu.RLock() + exporters := make(map[string]ExporterInterface) + maps.Copy(exporters, em.exporters) + em.mu.RUnlock() + + successfulExports := make(map[string]bool) + + for name, exporter := range exporters { + err := em.exportWithRetry(ctx, name, exporter, entries) + if err != nil { + slog.Error("Failed to export to target", "target", name, "error", err) + successfulExports[name] = false + } else { + slog.Debug("Successfully exported batch", "target", name, "count", len(entries)) + successfulExports[name] = true + } + } + + hasSuccess := false + for _, success := range successfulExports { + if success { + hasSuccess = true + break + } + } + + if hasSuccess { + ids := make([]int64, len(entries)) + for i, entry := range entries { + if id, ok := entry.Fields["_internal_id"].(int64); ok { + ids[i] = id + } + } + + if err := em.storage.(*SQLiteStorage).MarkAsExported(ctx, ids); err != nil { + slog.Error("Failed to mark entries as exported", "error", err) + } + } +} + +func (em *ExportManager) exportWithRetry(ctx context.Context, name string, exporter ExporterInterface, entries []LogEntry) error { + var lastErr error + + for attempt := 0; attempt <= em.config.RetryAttempts; attempt++ { + if attempt > 0 { + backoff := time.Duration(attempt) * em.config.RetryBackoff + slog.Debug("Retrying export", "target", name, "attempt", attempt, "backoff", backoff) + + select { + case <-ctx.Done(): + return ctx.Err() + case <-time.After(backoff): + } + } + + err := exporter.Export(ctx, entries) + if err == nil { + if attempt > 0 { + slog.Info("Export succeeded after retry", "target", name, "attempt", attempt) + } + return nil + } + + lastErr = err + slog.Warn("Export attempt failed", "target", name, "attempt", attempt, "error", err) + } + + return fmt.Errorf("export failed after %d attempts: %w", em.config.RetryAttempts, lastErr) +} + +func (em *ExportManager) performHealthChecks(ctx context.Context) { + em.mu.RLock() + exporters := make(map[string]ExporterInterface) + maps.Copy(exporters, em.exporters) + em.mu.RUnlock() + + for name, exporter := range exporters { + err := exporter.HealthCheck(ctx) + if err != nil { + slog.Warn("Health check failed", "target", name, "error", err) + } else { + slog.Debug("Health check passed", "target", name) + } + } +} diff --git a/local_storageV2.go b/local_storageV2.go new file mode 100644 index 0000000..58f1f39 --- /dev/null +++ b/local_storageV2.go @@ -0,0 +1,345 @@ +package main + +import ( + "context" + "database/sql" + "encoding/json" + "fmt" + "strings" + + _ "modernc.org/sqlite" +) + +type SQLiteStorage struct { + db *sql.DB +} + +func NewSQLiteStorage(dbPath string) (*SQLiteStorage, error) { + db, err := sql.Open("sqlite", dbPath) + if err != nil { + return nil, fmt.Errorf("failed to open SQLite database: %w", err) + } + + if _, err := db.Exec("PRAGMA journal_mode=WAL"); err != nil { + return nil, fmt.Errorf("failed to enable WAL mode: %w", err) + } + + if err := createTables(db); err != nil { + return nil, fmt.Errorf("failed to create tables: %w", err) + } + + return &SQLiteStorage{db: db}, nil +} + +func createTables(db *sql.DB) error { + createTableStmt := ` + CREATE TABLE IF NOT EXISTS log_entries ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + service TEXT, + timestamp DATETIME NOT NULL, + type TEXT NOT NULL, + host TEXT NOT NULL, + tool TEXT, + log_level TEXT, + log_message TEXT, + raw TEXT, + priority TEXT, + priority_name TEXT, + unit TEXT, + pid INTEGER, + boot_id TEXT, + machine_id TEXT, + fields TEXT, + service_information TEXT, + system_metrics TEXT, + tool_information TEXT, + created_at DATETIME DEFAULT CURRENT_TIMESTAMP, + exported_at DATETIME + );` + + if _, err := db.Exec(createTableStmt); err != nil { + return err + } + + indexes := []string{ + "CREATE INDEX IF NOT EXISTS idx_timestamp ON log_entries(timestamp);", + "CREATE INDEX IF NOT EXISTS idx_service ON log_entries(service);", + "CREATE INDEX IF NOT EXISTS idx_type ON log_entries(type);", + "CREATE INDEX IF NOT EXISTS idx_tool ON log_entries(tool);", + "CREATE INDEX IF NOT EXISTS idx_log_level ON log_entries(log_level);", + "CREATE INDEX IF NOT EXISTS idx_exported ON log_entries(exported_at);", + "CREATE INDEX IF NOT EXISTS idx_composite ON log_entries(timestamp, type, service);", + } + + for _, index := range indexes { + if _, err := db.Exec(index); err != nil { + return fmt.Errorf("failed to create index: %w", err) + } + } + + return nil +} + +func (s *SQLiteStorage) Store(ctx context.Context, entry *LogEntry) error { + return s.StoreBatch(ctx, []LogEntry{*entry}) +} + +func (s *SQLiteStorage) StoreBatch(ctx context.Context, entries []LogEntry) error { + if len(entries) == 0 { + return nil + } + + tx, err := s.db.BeginTx(ctx, nil) + if err != nil { + return fmt.Errorf("failed to begin transaction: %w", err) + } + defer tx.Rollback() + + stmt, err := tx.PrepareContext(ctx, ` + INSERT INTO log_entries + (service, timestamp, type, host, tool, log_level, log_message, raw, priority, priority_name, + unit, pid, boot_id, machine_id, fields, service_information, system_metrics, tool_information) + VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)`) + if err != nil { + return fmt.Errorf("failed to prepare statement: %w", err) + } + defer stmt.Close() + + for _, entry := range entries { + fieldsJSON, _ := json.Marshal(entry.Fields) + serviceInfoJSON, _ := json.Marshal(entry.ServiceInformation) + systemMetricsJSON, _ := json.Marshal(entry.SystemMetrics) + toolInfoJSON, _ := json.Marshal(entry.ToolInformation) + + _, err := stmt.ExecContext(ctx, + entry.Service, + entry.Timestamp, + entry.Type, + entry.Host, + entry.Tool, + entry.LogLevel, + entry.LogMessage, + entry.Raw, + entry.Priority, + entry.PriorityName, + entry.Unit, + entry.PID, + entry.BootID, + entry.MachineID, + string(fieldsJSON), + string(serviceInfoJSON), + string(systemMetricsJSON), + string(toolInfoJSON), + ) + if err != nil { + return fmt.Errorf("failed to insert entry: %w", err) + } + } + + return tx.Commit() +} + +func (s *SQLiteStorage) Query(ctx context.Context, query StorageQuery) ([]LogEntry, error) { + sqlQuery := "SELECT service, timestamp, type, host, tool, log_level, log_message, raw, priority, priority_name, unit, pid, boot_id, machine_id, fields, service_information, system_metrics, tool_information FROM log_entries WHERE 1=1" + args := []interface{}{} + argCount := 0 + + if !query.StartTime.IsZero() { + argCount++ + sqlQuery += fmt.Sprintf(" AND timestamp >= ?%d", argCount) + args = append(args, query.StartTime) + } + if !query.EndTime.IsZero() { + argCount++ + sqlQuery += fmt.Sprintf(" AND timestamp <= ?%d", argCount) + args = append(args, query.EndTime) + } + if query.Service != "" { + argCount++ + sqlQuery += fmt.Sprintf(" AND service = ?%d", argCount) + args = append(args, query.Service) + } + if query.Tool != "" { + argCount++ + sqlQuery += fmt.Sprintf(" AND tool = ?%d", argCount) + args = append(args, query.Tool) + } + if query.LogLevel != "" { + argCount++ + sqlQuery += fmt.Sprintf(" AND log_level = ?%d", argCount) + args = append(args, query.LogLevel) + } + if query.Type != "" { + argCount++ + sqlQuery += fmt.Sprintf(" AND type = ?%d", argCount) + args = append(args, query.Type) + } + + if query.OrderBy != "" { + direction := "ASC" + if query.OrderDesc { + direction = "DESC" + } + sqlQuery += fmt.Sprintf(" ORDER BY %s %s", query.OrderBy, direction) + } else { + sqlQuery += " ORDER BY timestamp DESC" + } + + if query.Limit > 0 { + sqlQuery += fmt.Sprintf(" LIMIT %d", query.Limit) + if query.Offset > 0 { + sqlQuery += fmt.Sprintf(" OFFSET %d", query.Offset) + } + } + + rows, err := s.db.QueryContext(ctx, sqlQuery, args...) + if err != nil { + return nil, fmt.Errorf("failed to execute query: %w", err) + } + defer rows.Close() + + var entries []LogEntry + for rows.Next() { + var entry LogEntry + var fieldsJSON, serviceInfoJSON, systemMetricsJSON, toolInfoJSON string + + err := rows.Scan( + &entry.Service, + &entry.Timestamp, + &entry.Type, + &entry.Host, + &entry.Tool, + &entry.LogLevel, + &entry.LogMessage, + &entry.Raw, + &entry.Priority, + &entry.PriorityName, + &entry.Unit, + &entry.PID, + &entry.BootID, + &entry.MachineID, + &fieldsJSON, + &serviceInfoJSON, + &systemMetricsJSON, + &toolInfoJSON, + ) + if err != nil { + return nil, fmt.Errorf("failed to scan row: %w", err) + } + + if fieldsJSON != "" && fieldsJSON != "null" { + json.Unmarshal([]byte(fieldsJSON), &entry.Fields) + } + if serviceInfoJSON != "" && serviceInfoJSON != "null" { + json.Unmarshal([]byte(serviceInfoJSON), &entry.ServiceInformation) + } + if systemMetricsJSON != "" && systemMetricsJSON != "null" { + json.Unmarshal([]byte(systemMetricsJSON), &entry.SystemMetrics) + } + if toolInfoJSON != "" && toolInfoJSON != "null" { + json.Unmarshal([]byte(toolInfoJSON), &entry.ToolInformation) + } + + entries = append(entries, entry) + } + + return entries, rows.Err() +} + +func (s *SQLiteStorage) MarkAsExported(ctx context.Context, ids []int64) error { + if len(ids) == 0 { + return nil + } + + placeholders := strings.Repeat("?,", len(ids)) + placeholders = placeholders[:len(placeholders)-1] // Remove trailing comma + + sqlQuery := fmt.Sprintf("UPDATE log_entries SET exported_at = CURRENT_TIMESTAMP WHERE id IN (%s)", placeholders) + + args := make([]interface{}, len(ids)) + for i, id := range ids { + args[i] = id + } + + _, err := s.db.ExecContext(ctx, sqlQuery, args...) + return err +} + +func (s *SQLiteStorage) GetUnexportedEntries(ctx context.Context, limit int) ([]LogEntry, error) { + // query := StorageQuery{ + // Limit: limit, + // OrderBy: "timestamp", + // } + + sqlQuery := `SELECT id, service, timestamp, type, host, tool, log_level, log_message, raw, priority, priority_name, + unit, pid, boot_id, machine_id, fields, service_information, system_metrics, tool_information + FROM log_entries WHERE exported_at IS NULL ORDER BY timestamp ASC` + + if limit > 0 { + sqlQuery += fmt.Sprintf(" LIMIT %d", limit) + } + + rows, err := s.db.QueryContext(ctx, sqlQuery) + if err != nil { + return nil, fmt.Errorf("failed to execute query: %w", err) + } + defer rows.Close() + + var entries []LogEntry + for rows.Next() { + var entry LogEntry + var id int64 + var fieldsJSON, serviceInfoJSON, systemMetricsJSON, toolInfoJSON string + + err := rows.Scan( + &id, + &entry.Service, + &entry.Timestamp, + &entry.Type, + &entry.Host, + &entry.Tool, + &entry.LogLevel, + &entry.LogMessage, + &entry.Raw, + &entry.Priority, + &entry.PriorityName, + &entry.Unit, + &entry.PID, + &entry.BootID, + &entry.MachineID, + &fieldsJSON, + &serviceInfoJSON, + &systemMetricsJSON, + &toolInfoJSON, + ) + if err != nil { + return nil, fmt.Errorf("failed to scan row: %w", err) + } + + if entry.Fields == nil { + entry.Fields = make(map[string]any) + } + entry.Fields["_internal_id"] = id + + if fieldsJSON != "" && fieldsJSON != "null" { + json.Unmarshal([]byte(fieldsJSON), &entry.Fields) + } + if serviceInfoJSON != "" && serviceInfoJSON != "null" { + json.Unmarshal([]byte(serviceInfoJSON), &entry.ServiceInformation) + } + if systemMetricsJSON != "" && systemMetricsJSON != "null" { + json.Unmarshal([]byte(systemMetricsJSON), &entry.SystemMetrics) + } + if toolInfoJSON != "" && toolInfoJSON != "null" { + json.Unmarshal([]byte(toolInfoJSON), &entry.ToolInformation) + } + + entries = append(entries, entry) + } + + return entries, rows.Err() +} + +func (s *SQLiteStorage) Close() error { + return s.db.Close() +} diff --git a/log_processor.go b/log_processor.go index 27b2e57..5921e11 100644 --- a/log_processor.go +++ b/log_processor.go @@ -4,20 +4,16 @@ import ( "context" "log/slog" "time" - - "github.com/elastic/go-elasticsearch/v8" ) type LogProcessor struct { - sender ElasticsearchSender - baseIndex string + storage StorageInterface batchSize int } -func NewLogProcessor(es *elasticsearch.Client, baseIndex string) *LogProcessor { +func NewLogProcessor(storage StorageInterface) *LogProcessor { return &LogProcessor{ - sender: NewElasticsearchSender(es), - baseIndex: baseIndex, + storage: storage, batchSize: 100, } } @@ -33,7 +29,7 @@ func (lp *LogProcessor) Start(ctx context.Context, logChan <-chan LogEntry) { select { case <-ctx.Done(): if len(batch) > 0 { - lp.sendBatch(batch) + lp.storeBatch(ctx, batch) } slog.Info("Log processor stopped") return @@ -41,7 +37,7 @@ func (lp *LogProcessor) Start(ctx context.Context, logChan <-chan LogEntry) { case entry, ok := <-logChan: if !ok { if len(batch) > 0 { - lp.sendBatch(batch) + lp.storeBatch(ctx, batch) } return } @@ -49,30 +45,30 @@ func (lp *LogProcessor) Start(ctx context.Context, logChan <-chan LogEntry) { batch = append(batch, entry) if len(batch) >= lp.batchSize { - lp.sendBatch(batch) + lp.storeBatch(ctx, batch) batch = batch[:0] } case <-ticker.C: if len(batch) > 0 { - lp.sendBatch(batch) + lp.storeBatch(ctx, batch) batch = batch[:0] } } } } -func (lp *LogProcessor) sendBatch(batch []LogEntry) { +func (lp *LogProcessor) storeBatch(ctx context.Context, batch []LogEntry) { if len(batch) == 0 { return } - if err := lp.sender.SendBatch(lp.baseIndex, batch); err != nil { - slog.Error("error sending log batch", "error", err, "batch_size", len(batch)) + if err := lp.storage.StoreBatch(ctx, batch); err != nil { + slog.Error("error storing log batch", "error", err, "batch_size", len(batch)) return } - slog.Debug("Log batch sent successfully", "batch_size", len(batch)) + slog.Debug("Log batch stored successfully", "batch_size", len(batch)) } func (lp *LogProcessor) SetBatchSize(size int) { diff --git a/main.go b/main.go index f6f29a4..0d829bd 100644 --- a/main.go +++ b/main.go @@ -21,25 +21,60 @@ func init() { } func main() { - cfg, err := LoadConfig() + cfg, err := LoadConfigV2() if err != nil { slog.Error("error loading configuration", "error", err) os.Exit(1) } slog.Info("TIXEL System Monitor started") - es, err := NewElasticsearchClient(cfg.Elasticsearch) - if err != nil { - slog.Error("elasticsearch client error", "error", err) + var storage StorageInterface + if cfg.LocalStorage.Enable { + sqliteStorage, err := NewSQLiteStorage(cfg.LocalStorage.DBPath) + if err != nil { + slog.Error("failed to initialize SQLite storage", "error", err) + os.Exit(1) + } + storage = sqliteStorage + defer storage.Close() + slog.Info("SQLite storage initialized", "path", cfg.LocalStorage.DBPath) + } else { + slog.Error("Local storage is disabled, but it's required for the new architecture") os.Exit(1) } - if err := TestElasticsearchConnection(es); err != nil { - slog.Error("elasticsearch connection test failed", "error", err) - os.Exit(1) - } + var exportManager *ExportManager + if cfg.Export.Enabled { + exportConfig := ExportManagerConfig{ + BatchSize: cfg.Export.BatchSize, + ExportInterval: cfg.Export.ExportInterval, + RetryAttempts: cfg.Export.RetryAttempts, + RetryBackoff: cfg.Export.RetryBackoff, + HealthCheckInterval: cfg.Export.HealthCheckInterval, + } - slog.Info("Elasticsearch connection successful") + exportManager = NewExportManager(storage, exportConfig) + + if cfg.Elasticsearch.Enabled { + esExporter, err := NewElasticsearchExporterV2(cfg.Elasticsearch) + if err != nil { + slog.Error("failed to create Elasticsearch exporter", "error", err) + os.Exit(1) + } + + if err := esExporter.HealthCheck(context.Background()); err != nil { + slog.Error("Elasticsearch health check failed", "error", err) + os.Exit(1) + } + + exportManager.RegisterExporter("elasticsearch", esExporter) + slog.Info("Elasticsearch exporter registered") + } + + // Add more exporters here in the future + // exportManager.RegisterExporter("checkmk", checkmkExporter) + // exportManager.RegisterExporter("grafana", grafanaExporter) + } logChan := make(chan LogEntry, 1000) ctx, cancel := context.WithCancel(context.Background()) @@ -47,6 +82,21 @@ func main() { var wg sync.WaitGroup + wg.Add(1) + go func() { + defer wg.Done() + processor := NewLogProcessor(storage) + processor.Start(ctx, logChan) + }() + + if exportManager != nil { + wg.Add(1) + go func() { + defer wg.Done() + exportManager.Start(ctx) + }() + } + for _, service := range cfg.Services { if !service.Enabled { slog.Info("Service deactivated, skipping...", "service", service.Name) @@ -88,23 +138,16 @@ func main() { go func() { defer wg.Done() collector := NewSystemMetricsCollector(cfg.SystemMetrics, cfg.PollIntervalSeconds) - collector.Start(ctx, es, cfg.Elasticsearch.Index) + collector.StartV2(ctx, storage, logChan) }() slog.Info("Started collecting System-Metrics") } - wg.Add(1) - go func() { - defer wg.Done() - processor := NewLogProcessor(es, cfg.Elasticsearch.Index) - processor.Start(ctx, logChan) - }() - if cfg.WebService.Enabled { wg.Add(1) go func() { defer wg.Done() - webService := NewWebService(cfg, es) + webService := NewWebServiceV2(cfg, storage) if err := webService.Start(ctx); err != nil { slog.Error("web service error", "error", err) } diff --git a/service_monitor.go b/service_monitor.go index 187885e..e91a4a8 100644 --- a/service_monitor.go +++ b/service_monitor.go @@ -159,6 +159,9 @@ func (jep *JournalEntryParser) Parse(jsonLine string) (LogEntry, error) { if machineID, ok := journalData["_MACHINE_ID"].(string); ok { entry.MachineID = machineID } + if hostname, ok := journalData["_HOSTNAME"].(string); ok { + entry.Host = hostname + } entry.Raw = jsonLine diff --git a/storage_interface.go b/storage_interface.go new file mode 100644 index 0000000..c3039e7 --- /dev/null +++ b/storage_interface.go @@ -0,0 +1,31 @@ +package main + +import ( + "context" + "time" +) + +type StorageInterface interface { + Store(ctx context.Context, entry *LogEntry) error + StoreBatch(ctx context.Context, entries []LogEntry) error + Query(ctx context.Context, query StorageQuery) ([]LogEntry, error) + Close() error +} + +type StorageQuery struct { + StartTime time.Time + EndTime time.Time + Service string + Tool string + LogLevel string + Type string + Limit int + Offset int + OrderBy string + OrderDesc bool +} + +type ExporterInterface interface { + Export(ctx context.Context, entries []LogEntry) error + HealthCheck(ctx context.Context) error +} diff --git a/system_metricsV2.go b/system_metricsV2.go new file mode 100644 index 0000000..69dd07f --- /dev/null +++ b/system_metricsV2.go @@ -0,0 +1,39 @@ +package main + +import ( + "context" + "log/slog" + "time" +) + +func (smc *SystemMetricsCollector) StartV2(ctx context.Context, storage StorageInterface, logChan chan<- LogEntry) { + ticker := time.NewTicker(time.Duration(smc.pollInterval) * time.Second) + defer ticker.Stop() + + for { + select { + case <-ctx.Done(): + slog.Info("System metrics collector stopped") + return + case <-ticker.C: + metrics, err := smc.collectMetrics() + if err != nil { + slog.Error("error collecting system metrics", "error", err) + continue + } + + entry := NewLogEntry("system_metrics") + entry.Service = "system-metrics" + entry.LogLevel = "Info" + entry.SystemMetrics = metrics + + select { + case logChan <- entry: + case <-ctx.Done(): + return + default: + slog.Warn("Log channel is full, system metrics dropped") + } + } + } +} diff --git a/web_serviceV2.go b/web_serviceV2.go new file mode 100644 index 0000000..61b5f73 --- /dev/null +++ b/web_serviceV2.go @@ -0,0 +1,308 @@ +package main + +import ( + "context" + "encoding/json" + "fmt" + "log/slog" + "net/http" + "os/exec" + "slices" + "strconv" + "strings" + "time" +) + +type WebServiceV2 struct { + server *http.Server + storage StorageInterface + config *Config +} + +func LoggingMiddleware(next http.Handler) http.Handler { + return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + slog.Debug("WebService", "Remote-Address", r.RemoteAddr, "Method", r.Method, "Path", r.URL.Path) + next.ServeHTTP(w, r) + }) +} + +func NewWebServiceV2(config *Config, storage StorageInterface) *WebServiceV2 { + mux := http.NewServeMux() + + ws := &WebServiceV2{ + storage: storage, + config: config, + } + + mux.HandleFunc("GET /health", ws.handleHealth) + mux.HandleFunc("GET /logs", ws.handleLogs) + mux.HandleFunc("GET /export", ws.handleExport) + mux.HandleFunc("GET /stats", ws.handleStats) + + loggedMux := LoggingMiddleware(mux) + + addr := fmt.Sprintf("%s:%d", config.WebService.Host, config.WebService.Port) + ws.server = &http.Server{ + Addr: addr, + Handler: loggedMux, + ReadTimeout: 30 * time.Second, + WriteTimeout: 300 * time.Second, + IdleTimeout: 60 * time.Second, + } + + return ws +} + +func (ws *WebServiceV2) Start(ctx context.Context) error { + go func() { + <-ctx.Done() + shutdownCtx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + + if err := ws.server.Shutdown(shutdownCtx); err != nil { + slog.Error("web service shutdown error", "error", err) + } + }() + + slog.Info("Starting web service", "address", ws.server.Addr) + + if err := ws.server.ListenAndServe(); err != http.ErrServerClosed { + return fmt.Errorf("web service error: %w", err) + } + + return nil +} + +func (ws *WebServiceV2) handleHealth(w http.ResponseWriter, r *http.Request) { + // if r.Method != http.MethodGet { + // http.Error(w, "Method not allowed", http.StatusMethodNotAllowed) + // return + // } + + status := map[string]any{ + "status": "healthy", + "timestamp": time.Now(), + "storage": "sqlite", + } + + ctx, cancel := context.WithTimeout(r.Context(), 5*time.Second) + defer cancel() + + _, err := ws.storage.Query(ctx, StorageQuery{ + Limit: 1, + }) + + if err != nil { + status["storage_status"] = "unhealthy" + status["storage_error"] = err.Error() + w.WriteHeader(http.StatusServiceUnavailable) + } else { + status["storage_status"] = "healthy" + } + statusMap := make(map[string]any) + statusMap["tixel-watch"] = status + + for _, service := range ws.config.Services { + statusCommand := []string{"sudo", "systemctl", "status", service.Name, "--no-pager"} + if service.Enabled { + serviceStatus, err := exec.Command(statusCommand[0], statusCommand[1:]...).Output() + if err != nil { + slog.Error("error executing status command", "error", err) + continue + } + lines := strings.SplitSeq(string(serviceStatus), "\n") + for line := range lines { + if strings.Contains(line, "Active:") { + serviceHealth, found := strings.CutPrefix(strings.TrimSpace(line), "Active:") + if found { + statusMap[service.Name] = map[string]any{"status": serviceHealth, "timestamp": time.Now()} + } + } + } + } + } + + w.Header().Set("Content-Type", "application/json") + json.NewEncoder(w).Encode(statusMap) +} + +func (ws *WebServiceV2) handleLogs(w http.ResponseWriter, r *http.Request) { + // if r.Method != http.MethodGet { + // http.Error(w, "Method not allowed", http.StatusMethodNotAllowed) + // return + // } + + query := ws.parseLogsQuery(r) + + ctx, cancel := context.WithTimeout(r.Context(), 30*time.Second) + defer cancel() + + entries, err := ws.storage.Query(ctx, query) + if err != nil { + http.Error(w, fmt.Sprintf("Query error: %v", err), http.StatusInternalServerError) + return + } + + response := map[string]any{ + "entries": entries, + "count": len(entries), + "query": query, + } + + w.Header().Set("Content-Type", "application/json") + json.NewEncoder(w).Encode(response) +} + +func (ws *WebServiceV2) handleExport(w http.ResponseWriter, r *http.Request) { + // if r.Method != http.MethodGet { + // http.Error(w, "Method not allowed", http.StatusMethodNotAllowed) + // return + // } + + query := ws.parseLogsQuery(r) + + ctx, cancel := context.WithTimeout(r.Context(), 300*time.Second) + defer cancel() + + entries, err := ws.storage.Query(ctx, query) + if err != nil { + http.Error(w, fmt.Sprintf("Export query error: %v", err), http.StatusInternalServerError) + return + } + + filename := fmt.Sprintf("tixel_export_%s.json", time.Now().Format("20060102_150405")) + w.Header().Set("Content-Type", "application/json") + w.Header().Set("Content-Disposition", fmt.Sprintf("attachment; filename=%s", filename)) + + exportData := map[string]any{ + "export_info": map[string]any{ + "timestamp": time.Now(), + "entry_count": len(entries), + "query": query, + "exported_by": "tixel-watch", + }, + "entries": entries, + } + + if err := json.NewEncoder(w).Encode(exportData); err != nil { + slog.Error("Failed to encode export data", "error", err) + return + } + + slog.Info("Data exported", "count", len(entries), "query", query) +} + +func (ws *WebServiceV2) handleStats(w http.ResponseWriter, r *http.Request) { + // if r.Method != http.MethodGet { + // http.Error(w, "Method not allowed", http.StatusMethodNotAllowed) + // return + // } + + ctx, cancel := context.WithTimeout(r.Context(), 10*time.Second) + defer cancel() + + allEntries, err := ws.storage.Query(ctx, StorageQuery{}) + if err != nil { + http.Error(w, fmt.Sprintf("Stats query error: %v", err), http.StatusInternalServerError) + return + } + + recentEntries, err := ws.storage.Query(ctx, StorageQuery{ + StartTime: time.Now().Add(-time.Hour), + }) + if err != nil { + slog.Error("Failed to query recent entries", "error", err) + recentEntries = []LogEntry{} + } + + stats := map[string]any{ + "total_entries": len(allEntries), + "recent_entries": len(recentEntries), + "timestamp": time.Now(), + } + + typeCounts := make(map[string]int) + serviceCounts := make(map[string]int) + toolCounts := make(map[string]int) + + for _, entry := range allEntries { + typeCounts[entry.Type]++ + if entry.Service != "" { + serviceCounts[entry.Service]++ + } + if entry.Tool != "" { + toolCounts[entry.Tool]++ + } + } + + stats["by_type"] = typeCounts + stats["by_service"] = serviceCounts + stats["by_tool"] = toolCounts + + w.Header().Set("Content-Type", "application/json") + json.NewEncoder(w).Encode(stats) +} + +func (ws *WebServiceV2) parseLogsQuery(r *http.Request) StorageQuery { + query := StorageQuery{ + Limit: 100, + OrderBy: "timestamp", + OrderDesc: true, + } + + if limitStr := r.URL.Query().Get("limit"); limitStr != "" { + if limit, err := strconv.Atoi(limitStr); err == nil && limit > 0 { + if limit > 10000 { + limit = 10000 + } + query.Limit = limit + } + } + + if offsetStr := r.URL.Query().Get("offset"); offsetStr != "" { + if offset, err := strconv.Atoi(offsetStr); err == nil && offset >= 0 { + query.Offset = offset + } + } + + if startTime := r.URL.Query().Get("start_time"); startTime != "" { + if t, err := time.Parse(time.RFC3339, startTime); err == nil { + query.StartTime = t + } + } + + if endTime := r.URL.Query().Get("end_time"); endTime != "" { + if t, err := time.Parse(time.RFC3339, endTime); err == nil { + query.EndTime = t + } + } + + if service := r.URL.Query().Get("service"); service != "" { + query.Service = strings.TrimSpace(service) + } + + if tool := r.URL.Query().Get("tool"); tool != "" { + query.Tool = strings.TrimSpace(tool) + } + + if logLevel := r.URL.Query().Get("log_level"); logLevel != "" { + query.LogLevel = strings.TrimSpace(logLevel) + } + + if entryType := r.URL.Query().Get("type"); entryType != "" { + query.Type = strings.TrimSpace(entryType) + } + + if orderBy := r.URL.Query().Get("order_by"); orderBy != "" { + validFields := []string{"timestamp", "service", "tool", "type", "log_level"} + if slices.Contains(validFields, orderBy) { + query.OrderBy = orderBy + } + } + + if orderDesc := r.URL.Query().Get("order_desc"); orderDesc != "" { + query.OrderDesc = orderDesc == "true" + } + + return query +}