From 1f07632ae209ca95a52cb8bcbf8175112ff76707 Mon Sep 17 00:00:00 2001 From: Patryk Hegenberg Date: Thu, 25 Sep 2025 10:23:48 +0200 Subject: [PATCH] feat: implement log-rotation for local sqlite storage --- config.go | 57 +++++++++- local_storageV2.go | 262 +++++++++++++++++++++++++++++++++++++++++++-- main.go | 12 ++- service_monitor.go | 3 + web_serviceV2.go | 16 --- 5 files changed, 322 insertions(+), 28 deletions(-) diff --git a/config.go b/config.go index 065f3f0..93f0a88 100644 --- a/config.go +++ b/config.go @@ -57,8 +57,9 @@ type ElasticsearchConfig struct { } type LocalStorage struct { - Enable bool `mapstructure:"enabled"` - DBPath string `mapstructure:"db_path"` + Enable bool `mapstructure:"enabled"` + DBPath string `mapstructure:"db_path"` + RotationConfig StorageRotationConfig `mapstructure:"rotation"` } type SystemMetrics struct { @@ -97,6 +98,33 @@ type Config struct { } `mapstructure:"logging"` } +type StorageRotationConfig struct { + // MaxSizeBytes is the maximum size of the database in bytes (0 = deactivated) + MaxSizeBytes int64 `mapstructure:"max_size_bytes"` + // MaxAgeHours is the maximum age of the database (0 = deaactivated) + MaxAgeHours time.Duration `mapstructure:"max_age_hours"` + // MaxFiles is the maximum count of old files, to keep + MaxFiles int `mapstructure:"max_files"` + // CheckIntervalMinutes is the intervall for checking rotation conditions + CheckIntervalMinutes time.Duration `mapstructure:"check_interval_minutes"` + // ArchiveDir is the dir to store archived files (empty = same dir as db) + ArchiveDir string `mapstructure:"archive_dir"` +} + +func (src StorageRotationConfig) GetMaxAge() time.Duration { + if src.MaxAgeHours <= 0 { + return 0 + } + return time.Duration(src.MaxAgeHours) * time.Hour +} + +func (src StorageRotationConfig) GetCheckInterval() time.Duration { + if src.CheckIntervalMinutes <= 0 { + return 5 * time.Minute + } + return time.Duration(src.CheckIntervalMinutes) * time.Minute +} + func LoadConfig() (*Config, error) { viper.SetConfigName("config") viper.AddConfigPath(".") @@ -157,6 +185,11 @@ func setConfigDefaultsV2() { viper.SetDefault("export.health_check_interval", "60s") viper.SetDefault("localstorage.enabled", true) viper.SetDefault("localstorage.db_path", "./tixel_watch.db") + viper.SetDefault("localstorage.rotation.max_size_bytes", int64(100*1024*1024)) + viper.SetDefault("localstorage.rotation.max_age_hours", 24) + viper.SetDefault("localstorage.rotation.max_files", 7) + viper.SetDefault("localstorage.rotation.check_interval_minutes", 5) + viper.SetDefault("localstorage.rotation.archive_dir", "") } func validateConfig(cfg *Config) error { @@ -253,5 +286,25 @@ func validateConfigV2(cfg *Config) error { } } + if cfg.LocalStorage.RotationConfig.MaxSizeBytes < 0 { + slog.Warn("Invalid rotation max_size_bytes, setting to 100MB", "value", cfg.LocalStorage.RotationConfig.MaxSizeBytes) + cfg.LocalStorage.RotationConfig.MaxSizeBytes = 100 * 1024 * 1024 + } + + if cfg.LocalStorage.RotationConfig.MaxAgeHours < 0 { + slog.Warn("Invalid rotation max_age_hours, setting to 24", "value", cfg.LocalStorage.RotationConfig.MaxAgeHours) + cfg.LocalStorage.RotationConfig.MaxAgeHours = 24 + } + + if cfg.LocalStorage.RotationConfig.MaxFiles < 0 { + slog.Warn("Invalid rotation max_files, setting to 7", "value", cfg.LocalStorage.RotationConfig.MaxFiles) + cfg.LocalStorage.RotationConfig.MaxFiles = 7 + } + + if cfg.LocalStorage.RotationConfig.CheckIntervalMinutes < 1 { + slog.Warn("Invalid rotation check_interval_minutes, setting to 5", "value", cfg.LocalStorage.RotationConfig.CheckIntervalMinutes) + cfg.LocalStorage.RotationConfig.CheckIntervalMinutes = 5 + } + return nil } diff --git a/local_storageV2.go b/local_storageV2.go index bf2f2d3..f6c7dea 100644 --- a/local_storageV2.go +++ b/local_storageV2.go @@ -5,17 +5,46 @@ import ( "database/sql" "encoding/json" "fmt" + "log/slog" + "os" + "path/filepath" + "sort" "strings" + "sync" + "time" "tixel_watch/models" _ "modernc.org/sqlite" ) type SQLiteStorage struct { - db *sql.DB + db *sql.DB + dbPath string + rotationCfg StorageRotationConfig + rotationStop chan struct{} + rotationWg sync.WaitGroup + mu sync.RWMutex +} + +func DefaultRotationConfig() StorageRotationConfig { + return StorageRotationConfig{ + MaxSizeBytes: 100 * 1024 * 1024, // 100MB + MaxAgeHours: 48 * time.Hour, // 48 hours + MaxFiles: 3, // 3 old Files + CheckIntervalMinutes: 5 * time.Minute, // check every 5 minutes + ArchiveDir: "", // same directory + } } func NewSQLiteStorage(dbPath string) (*SQLiteStorage, error) { + return NewSQLiteStorageWithRotation(dbPath, StorageRotationConfig{}) +} + +func NewSQLiteStorageWithRotation(dbPath string, rotationCfg StorageRotationConfig) (*SQLiteStorage, error) { + if rotationCfg.CheckIntervalMinutes == 0 { + rotationCfg = DefaultRotationConfig() + } + db, err := sql.Open("sqlite", dbPath) if err != nil { return nil, fmt.Errorf("failed to open SQLite database: %w", err) @@ -29,7 +58,211 @@ func NewSQLiteStorage(dbPath string) (*SQLiteStorage, error) { return nil, fmt.Errorf("failed to create tables: %w", err) } - return &SQLiteStorage{db: db}, nil + storage := &SQLiteStorage{ + db: db, + dbPath: dbPath, + rotationCfg: rotationCfg, + rotationStop: make(chan struct{}), + } + + if rotationCfg.MaxSizeBytes > 0 || rotationCfg.MaxAgeHours > 0 { + storage.rotationWg.Add(1) + go storage.rotationWorker() + slog.Info("Log rotation enabled", + "maxSize", rotationCfg.MaxSizeBytes, + "maxAge", rotationCfg.MaxAgeHours, + "maxFiles", rotationCfg.MaxFiles) + } + + return storage, nil +} + +func (s *SQLiteStorage) rotationWorker() { + defer s.rotationWg.Done() + ticker := time.NewTicker(s.rotationCfg.CheckIntervalMinutes) + defer ticker.Stop() + + for { + select { + case <-s.rotationStop: + return + case <-ticker.C: + if err := s.checkAndRotate(); err != nil { + slog.Error("Error during log rotation check", "error", err) + } + } + } +} + +func (s *SQLiteStorage) checkAndRotate() error { + s.mu.Lock() + defer s.mu.Unlock() + + needsRotation, reason, err := s.needsRotation() + if err != nil { + return fmt.Errorf("error checking rotation needs: %w", err) + } + + if needsRotation { + slog.Info("Starting log rotation", "reason", reason) + if err := s.rotateDatabase(); err != nil { + return fmt.Errorf("error rotating database: %w", err) + } + slog.Info("Log rotation completed successfully") + } + + return nil +} + +func (s *SQLiteStorage) needsRotation() (bool, string, error) { + if s.rotationCfg.MaxSizeBytes > 0 { + fileInfo, err := os.Stat(s.dbPath) + if err != nil { + return false, "", err + } + if fileInfo.Size() >= s.rotationCfg.MaxSizeBytes { + return true, fmt.Sprintf("file size %d >= max size %d", fileInfo.Size(), s.rotationCfg.MaxSizeBytes), nil + } + } + + if s.rotationCfg.MaxAgeHours > 0 { + fileInfo, err := os.Stat(s.dbPath) + if err != nil { + return false, "", err + } + age := time.Since(fileInfo.ModTime()) + if age >= s.rotationCfg.MaxAgeHours { + return true, fmt.Sprintf("file age %v >= max age %v", age, s.rotationCfg.MaxAgeHours), nil + } + } + + return false, "", nil +} + +func (s *SQLiteStorage) rotateDatabase() error { + if err := s.db.Close(); err != nil { + return fmt.Errorf("error closing database: %w", err) + } + + archivePath := s.generateArchivePath() + + if err := os.Rename(s.dbPath, archivePath); err != nil { + return fmt.Errorf("error moving database to archive: %w", err) + } + + db, err := sql.Open("sqlite", s.dbPath) + if err != nil { + return fmt.Errorf("error opening new database: %w", err) + } + + if _, err := db.Exec("PRAGMA journal_mode=WAL"); err != nil { + return fmt.Errorf("failed to enable WAL mode on new database: %w", err) + } + + if err := createTables(db); err != nil { + return fmt.Errorf("failed to create tables in new database: %w", err) + } + + s.db = db + + if err := s.cleanupOldArchives(); err != nil { + slog.Warn("Error cleaning up old archives", "error", err) + } + + return nil +} + +func (s *SQLiteStorage) generateArchivePath() string { + dir := filepath.Dir(s.dbPath) + if s.rotationCfg.ArchiveDir != "" { + dir = s.rotationCfg.ArchiveDir + os.MkdirAll(dir, 0755) + } + + base := filepath.Base(s.dbPath) + ext := filepath.Ext(base) + name := strings.TrimSuffix(base, ext) + + timestamp := time.Now().Format("2006-01-02_15-04-05") + archiveName := fmt.Sprintf("%s.%s%s", name, timestamp, ext) + + return filepath.Join(dir, archiveName) +} + +func (s *SQLiteStorage) cleanupOldArchives() error { + if s.rotationCfg.MaxFiles <= 0 { + return nil + } + + dir := filepath.Dir(s.dbPath) + if s.rotationCfg.ArchiveDir != "" { + dir = s.rotationCfg.ArchiveDir + } + + base := filepath.Base(s.dbPath) + ext := filepath.Ext(base) + name := strings.TrimSuffix(base, ext) + pattern := fmt.Sprintf("%s.*%s", name, ext) + + files, err := filepath.Glob(filepath.Join(dir, pattern)) + if err != nil { + return err + } + + var archives []string + for _, file := range files { + if file != s.dbPath { + archives = append(archives, file) + } + } + + sort.Slice(archives, func(i, j int) bool { + infoI, _ := os.Stat(archives[i]) + infoJ, _ := os.Stat(archives[j]) + return infoI.ModTime().After(infoJ.ModTime()) + }) + + if len(archives) > s.rotationCfg.MaxFiles { + for _, file := range archives[s.rotationCfg.MaxFiles:] { + if err := os.Remove(file); err != nil { + slog.Warn("Error removing old archive", "file", file, "error", err) + } else { + slog.Info("Removed old archive", "file", file) + } + } + } + + return nil +} + +func (s *SQLiteStorage) ForceRotate() error { + s.mu.Lock() + defer s.mu.Unlock() + + slog.Info("Forcing log rotation") + return s.rotateDatabase() +} + +func (s *SQLiteStorage) GetRotationInfo() (map[string]any, error) { + s.mu.RLock() + defer s.mu.RUnlock() + + fileInfo, err := os.Stat(s.dbPath) + if err != nil { + return nil, err + } + + info := map[string]any{ + "currentSize": fileInfo.Size(), + "maxSize": s.rotationCfg.MaxSizeBytes, + "currentAge": time.Since(fileInfo.ModTime()).String(), + "maxAge": s.rotationCfg.MaxAgeHours.String(), + "maxFiles": s.rotationCfg.MaxFiles, + "checkInterval": s.rotationCfg.CheckIntervalMinutes.String(), + "archiveDir": s.rotationCfg.ArchiveDir, + } + + return info, nil } func createTables(db *sql.DB) error { @@ -90,6 +323,9 @@ func (s *SQLiteStorage) StoreBatch(ctx context.Context, entries []models.LogMess return nil } + s.mu.RLock() + defer s.mu.RUnlock() + tx, err := s.db.BeginTx(ctx, nil) if err != nil { return fmt.Errorf("failed to begin transaction: %w", err) @@ -141,6 +377,9 @@ func (s *SQLiteStorage) StoreBatch(ctx context.Context, entries []models.LogMess } func (s *SQLiteStorage) Query(ctx context.Context, query StorageQuery) ([]models.LogMessage, error) { + s.mu.RLock() + defer s.mu.RUnlock() + sqlQuery := "SELECT service, timestamp, type, host, tool, log_level, log_message, raw, priority, priority_name, unit, pid, boot_id, machine_id, fields, service_information, system_metrics, tool_information FROM log_entries WHERE 1=1" args := []any{} argCount := 0 @@ -252,6 +491,9 @@ func (s *SQLiteStorage) MarkAsExported(ctx context.Context, ids []int64) error { return nil } + s.mu.RLock() + defer s.mu.RUnlock() + tx, err := s.db.BeginTx(ctx, nil) if err != nil { return err @@ -273,15 +515,13 @@ func (s *SQLiteStorage) MarkAsExported(ctx context.Context, ids []int64) error { return err } - err = tx.Commit() - if err != nil { - return err - } - - return err + return tx.Commit() } func (s *SQLiteStorage) GetUnexportedEntries(ctx context.Context, limit int) ([]models.LogMessage, error) { + s.mu.RLock() + defer s.mu.RUnlock() + sqlQuery := `SELECT id, service, timestamp, type, host, tool, log_level, log_message, raw, priority, priority_name, unit, pid, boot_id, machine_id, fields, service_information, system_metrics, tool_information FROM log_entries WHERE exported_at IS NULL ORDER BY timestamp ASC` @@ -352,5 +592,11 @@ func (s *SQLiteStorage) GetUnexportedEntries(ctx context.Context, limit int) ([] } func (s *SQLiteStorage) Close() error { + close(s.rotationStop) + s.rotationWg.Wait() + + s.mu.Lock() + defer s.mu.Unlock() + return s.db.Close() } diff --git a/main.go b/main.go index 79e5dfe..9ae138a 100644 --- a/main.go +++ b/main.go @@ -31,14 +31,22 @@ func main() { var storage StorageInterface if cfg.LocalStorage.Enable { - sqliteStorage, err := NewSQLiteStorage(cfg.LocalStorage.DBPath) + rotationConfig := StorageRotationConfig{ + MaxSizeBytes: cfg.LocalStorage.RotationConfig.MaxSizeBytes, + MaxAgeHours: cfg.LocalStorage.RotationConfig.GetMaxAge(), + MaxFiles: cfg.LocalStorage.RotationConfig.MaxFiles, + CheckIntervalMinutes: cfg.LocalStorage.RotationConfig.GetCheckInterval(), + ArchiveDir: cfg.LocalStorage.RotationConfig.ArchiveDir, + } + + sqliteStorage, err := NewSQLiteStorageWithRotation(cfg.LocalStorage.DBPath, rotationConfig) if err != nil { slog.Error("failed to initialize SQLite storage", "error", err) os.Exit(1) } storage = sqliteStorage defer storage.Close() - slog.Info("SQLite storage initialized", "path", cfg.LocalStorage.DBPath) + slog.Info("SQLite storage with rotation initialized", "path", cfg.LocalStorage.DBPath) } else { slog.Error("Local storage is disabled, but it's required for the new architecture") os.Exit(1) diff --git a/service_monitor.go b/service_monitor.go index c52ae5c..61a7021 100644 --- a/service_monitor.go +++ b/service_monitor.go @@ -202,6 +202,9 @@ func (jep *JournalEntryParser) extractSystemdFields(journalData map[string]any, for _, field := range systemdFields { if value, ok := journalData[field]; ok { esFieldName := strings.ToLower(strings.TrimPrefix(field, "_")) + if entry.Fields == nil { + entry.Fields = make(map[string]any) + } entry.Fields[esFieldName] = value } } diff --git a/web_serviceV2.go b/web_serviceV2.go index a112c05..d03f7a0 100644 --- a/web_serviceV2.go +++ b/web_serviceV2.go @@ -75,10 +75,6 @@ func (ws *WebServiceV2) Start(ctx context.Context) error { } func (ws *WebServiceV2) handleHealth(w http.ResponseWriter, r *http.Request) { - // if r.Method != http.MethodGet { - // http.Error(w, "Method not allowed", http.StatusMethodNotAllowed) - // return - // } status := map[string]any{ "status": "healthy", @@ -128,10 +124,6 @@ func (ws *WebServiceV2) handleHealth(w http.ResponseWriter, r *http.Request) { } func (ws *WebServiceV2) handleLogs(w http.ResponseWriter, r *http.Request) { - // if r.Method != http.MethodGet { - // http.Error(w, "Method not allowed", http.StatusMethodNotAllowed) - // return - // } query := ws.parseLogsQuery(r) @@ -155,10 +147,6 @@ func (ws *WebServiceV2) handleLogs(w http.ResponseWriter, r *http.Request) { } func (ws *WebServiceV2) handleExport(w http.ResponseWriter, r *http.Request) { - // if r.Method != http.MethodGet { - // http.Error(w, "Method not allowed", http.StatusMethodNotAllowed) - // return - // } query := ws.parseLogsQuery(r) @@ -194,10 +182,6 @@ func (ws *WebServiceV2) handleExport(w http.ResponseWriter, r *http.Request) { } func (ws *WebServiceV2) handleStats(w http.ResponseWriter, r *http.Request) { - // if r.Method != http.MethodGet { - // http.Error(w, "Method not allowed", http.StatusMethodNotAllowed) - // return - // } ctx, cancel := context.WithTimeout(r.Context(), 10*time.Second) defer cancel()