package main import ( "context" "database/sql" "encoding/json" "fmt" "log/slog" "os" "path/filepath" "sort" "strings" "sync" "time" "watch-tool/models" _ "modernc.org/sqlite" ) type SQLiteStorage struct { db *sql.DB dbPath string rotationCfg StorageRotationConfig rotationStop chan struct{} rotationWg sync.WaitGroup mu sync.RWMutex } func DefaultRotationConfig() StorageRotationConfig { return StorageRotationConfig{ MaxSizeBytes: 100 * 1024 * 1024, // 100MB MaxAgeHours: 48 * time.Hour, // 48 hours MaxFiles: 3, // 3 old Files CheckIntervalMinutes: 5 * time.Minute, // check every 5 minutes ArchiveDir: "", // same directory } } func NewSQLiteStorage(dbPath string) (*SQLiteStorage, error) { return NewSQLiteStorageWithRotation(dbPath, StorageRotationConfig{}) } func NewSQLiteStorageWithRotation(dbPath string, rotationCfg StorageRotationConfig) (*SQLiteStorage, error) { if rotationCfg.CheckIntervalMinutes == 0 { rotationCfg = DefaultRotationConfig() } dsn := fmt.Sprintf("%s?_busy_timeout=5000&_journal_mode=WAL", dbPath) db, err := sql.Open("sqlite", dsn) if err != nil { return nil, fmt.Errorf("failed to open SQLite database: %w", err) } if _, err := db.Exec("PRAGMA journal_mode=WAL"); err != nil { return nil, fmt.Errorf("failed to enable WAL mode: %w", err) } if err := createTables(db); err != nil { return nil, fmt.Errorf("failed to create tables: %w", err) } storage := &SQLiteStorage{ db: db, dbPath: dbPath, rotationCfg: rotationCfg, rotationStop: make(chan struct{}), } if rotationCfg.MaxSizeBytes > 0 || rotationCfg.MaxAgeHours > 0 { storage.rotationWg.Add(1) go storage.rotationWorker() slog.Info("Log rotation enabled", "maxSize", rotationCfg.MaxSizeBytes, "maxAge", rotationCfg.MaxAgeHours, "maxFiles", rotationCfg.MaxFiles) } return storage, nil } func (s *SQLiteStorage) rotationWorker() { defer s.rotationWg.Done() ticker := time.NewTicker(s.rotationCfg.CheckIntervalMinutes) defer ticker.Stop() for { select { case <-s.rotationStop: return case <-ticker.C: if err := s.checkAndRotate(); err != nil { slog.Error("Error during log rotation check", "error", err) } } } } func (s *SQLiteStorage) checkAndRotate() error { s.mu.Lock() defer s.mu.Unlock() needsRotation, reason, err := s.needsRotation() if err != nil { return fmt.Errorf("error checking rotation needs: %w", err) } if needsRotation { slog.Info("Starting log rotation", "reason", reason) if err := s.rotateDatabase(); err != nil { return fmt.Errorf("error rotating database: %w", err) } slog.Info("Log rotation completed successfully") } return nil } func (s *SQLiteStorage) needsRotation() (bool, string, error) { if s.rotationCfg.MaxSizeBytes > 0 { fileInfo, err := os.Stat(s.dbPath) if err != nil { return false, "", err } if fileInfo.Size() >= s.rotationCfg.MaxSizeBytes { return true, fmt.Sprintf("file size %d >= max size %d", fileInfo.Size(), s.rotationCfg.MaxSizeBytes), nil } } if s.rotationCfg.MaxAgeHours > 0 { fileInfo, err := os.Stat(s.dbPath) if err != nil { return false, "", err } age := time.Since(fileInfo.ModTime()) if age >= s.rotationCfg.MaxAgeHours { return true, fmt.Sprintf("file age %v >= max age %v", age, s.rotationCfg.MaxAgeHours), nil } } return false, "", nil } func (s *SQLiteStorage) rotateDatabase() error { if err := s.db.Close(); err != nil { return fmt.Errorf("error closing database: %w", err) } archivePath := s.generateArchivePath() if err := os.Rename(s.dbPath, archivePath); err != nil { return fmt.Errorf("error moving database to archive: %w", err) } db, err := sql.Open("sqlite", s.dbPath) if err != nil { return fmt.Errorf("error opening new database: %w", err) } if _, err := db.Exec("PRAGMA journal_mode=WAL"); err != nil { return fmt.Errorf("failed to enable WAL mode on new database: %w", err) } if err := createTables(db); err != nil { return fmt.Errorf("failed to create tables in new database: %w", err) } s.db = db if err := s.cleanupOldArchives(); err != nil { slog.Warn("Error cleaning up old archives", "error", err) } return nil } func (s *SQLiteStorage) generateArchivePath() string { dir := filepath.Dir(s.dbPath) if s.rotationCfg.ArchiveDir != "" { dir = s.rotationCfg.ArchiveDir os.MkdirAll(dir, 0755) } base := filepath.Base(s.dbPath) ext := filepath.Ext(base) name := strings.TrimSuffix(base, ext) timestamp := time.Now().Format("2006-01-02_15-04-05") archiveName := fmt.Sprintf("%s.%s%s", name, timestamp, ext) return filepath.Join(dir, archiveName) } func (s *SQLiteStorage) cleanupOldArchives() error { if s.rotationCfg.MaxFiles <= 0 { return nil } dir := filepath.Dir(s.dbPath) if s.rotationCfg.ArchiveDir != "" { dir = s.rotationCfg.ArchiveDir } base := filepath.Base(s.dbPath) ext := filepath.Ext(base) name := strings.TrimSuffix(base, ext) pattern := fmt.Sprintf("%s.*%s", name, ext) files, err := filepath.Glob(filepath.Join(dir, pattern)) if err != nil { return err } var archives []string for _, file := range files { if file != s.dbPath { archives = append(archives, file) } } sort.Slice(archives, func(i, j int) bool { infoI, _ := os.Stat(archives[i]) infoJ, _ := os.Stat(archives[j]) return infoI.ModTime().After(infoJ.ModTime()) }) if len(archives) > s.rotationCfg.MaxFiles { for _, file := range archives[s.rotationCfg.MaxFiles:] { if err := os.Remove(file); err != nil { slog.Warn("Error removing old archive", "file", file, "error", err) } else { slog.Info("Removed old archive", "file", file) } } } return nil } func (s *SQLiteStorage) ForceRotate() error { s.mu.Lock() defer s.mu.Unlock() slog.Info("Forcing log rotation") return s.rotateDatabase() } func (s *SQLiteStorage) GetRotationInfo() (map[string]any, error) { s.mu.RLock() defer s.mu.RUnlock() fileInfo, err := os.Stat(s.dbPath) if err != nil { return nil, err } info := map[string]any{ "currentSize": fileInfo.Size(), "maxSize": s.rotationCfg.MaxSizeBytes, "currentAge": time.Since(fileInfo.ModTime()).String(), "maxAge": s.rotationCfg.MaxAgeHours.String(), "maxFiles": s.rotationCfg.MaxFiles, "checkInterval": s.rotationCfg.CheckIntervalMinutes.String(), "archiveDir": s.rotationCfg.ArchiveDir, } return info, nil } func createTables(db *sql.DB) error { createTableStmt := ` CREATE TABLE IF NOT EXISTS log_entries ( id INTEGER PRIMARY KEY AUTOINCREMENT, service TEXT, timestamp DATETIME NOT NULL, type TEXT NOT NULL, host TEXT NOT NULL, tool TEXT, log_level TEXT, log_message TEXT, raw TEXT, priority TEXT, priority_name TEXT, unit TEXT, pid INTEGER, boot_id TEXT, machine_id TEXT, fields TEXT, service_information TEXT, system_metrics TEXT, tool_information TEXT, created_at DATETIME DEFAULT CURRENT_TIMESTAMP, exported_at DATETIME );` if _, err := db.Exec(createTableStmt); err != nil { return err } indexes := []string{ "CREATE INDEX IF NOT EXISTS idx_timestamp ON log_entries(timestamp);", "CREATE INDEX IF NOT EXISTS idx_service ON log_entries(service);", "CREATE INDEX IF NOT EXISTS idx_type ON log_entries(type);", "CREATE INDEX IF NOT EXISTS idx_tool ON log_entries(tool);", "CREATE INDEX IF NOT EXISTS idx_log_level ON log_entries(log_level);", "CREATE INDEX IF NOT EXISTS idx_exported ON log_entries(exported_at);", "CREATE INDEX IF NOT EXISTS idx_composite ON log_entries(timestamp, type, service);", } for _, index := range indexes { if _, err := db.Exec(index); err != nil { return fmt.Errorf("failed to create index: %w", err) } } return nil } func (s *SQLiteStorage) Store(ctx context.Context, entry *models.LogMessage) error { return s.StoreBatch(ctx, []models.LogMessage{*entry}) } func (s *SQLiteStorage) StoreBatch(ctx context.Context, entries []models.LogMessage) error { if len(entries) == 0 { return nil } s.mu.RLock() defer s.mu.RUnlock() tx, err := s.db.BeginTx(ctx, nil) if err != nil { return fmt.Errorf("failed to begin transaction: %w", err) } defer tx.Rollback() stmt, err := tx.PrepareContext(ctx, ` INSERT INTO log_entries (service, timestamp, type, host, tool, log_level, log_message, raw, priority, priority_name, unit, pid, boot_id, machine_id, fields, service_information, system_metrics, tool_information) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)`) if err != nil { return fmt.Errorf("failed to prepare statement: %w", err) } defer stmt.Close() for _, entry := range entries { fieldsJSON, _ := json.Marshal(entry.Fields) serviceInfoJSON, _ := json.Marshal(entry.ServiceInformation) systemMetricsJSON, _ := json.Marshal(entry.SystemMetrics) toolInfoJSON, _ := json.Marshal(entry.ToolInformation) _, err := stmt.ExecContext(ctx, entry.Service, entry.Timestamp, entry.Type, entry.Host, entry.Tool, entry.LogLevel, entry.LogMessage, entry.Raw, entry.Priority, entry.PriorityName, entry.Unit, entry.PID, entry.BootID, entry.MachineID, string(fieldsJSON), string(serviceInfoJSON), string(systemMetricsJSON), string(toolInfoJSON), ) if err != nil { return fmt.Errorf("failed to insert entry: %w", err) } } return tx.Commit() } func (s *SQLiteStorage) Query(ctx context.Context, query StorageQuery) ([]models.LogMessage, error) { s.mu.RLock() defer s.mu.RUnlock() sqlQuery := "SELECT service, timestamp, type, host, tool, log_level, log_message, raw, priority, priority_name, unit, pid, boot_id, machine_id, fields, service_information, system_metrics, tool_information FROM log_entries WHERE 1=1" args := []any{} argCount := 0 if !query.StartTime.IsZero() { argCount++ sqlQuery += fmt.Sprintf(" AND timestamp >= ?%d", argCount) args = append(args, query.StartTime) } if !query.EndTime.IsZero() { argCount++ sqlQuery += fmt.Sprintf(" AND timestamp <= ?%d", argCount) args = append(args, query.EndTime) } if query.Service != "" { argCount++ sqlQuery += fmt.Sprintf(" AND service = ?%d", argCount) args = append(args, query.Service) } if query.Tool != "" { argCount++ sqlQuery += fmt.Sprintf(" AND tool = ?%d", argCount) args = append(args, query.Tool) } if query.LogLevel != "" { argCount++ sqlQuery += fmt.Sprintf(" AND log_level = ?%d", argCount) args = append(args, query.LogLevel) } if query.Type != "" { argCount++ sqlQuery += fmt.Sprintf(" AND type = ?%d", argCount) args = append(args, query.Type) } if query.OrderBy != "" { direction := "ASC" if query.OrderDesc { direction = "DESC" } sqlQuery += fmt.Sprintf(" ORDER BY %s %s", query.OrderBy, direction) } else { sqlQuery += " ORDER BY timestamp DESC" } if query.Limit > 0 { sqlQuery += fmt.Sprintf(" LIMIT %d", query.Limit) if query.Offset > 0 { sqlQuery += fmt.Sprintf(" OFFSET %d", query.Offset) } } rows, err := s.db.QueryContext(ctx, sqlQuery, args...) if err != nil { return nil, fmt.Errorf("failed to execute query: %w", err) } defer rows.Close() var entries []models.LogMessage for rows.Next() { var entry models.LogMessage var fieldsJSON, serviceInfoJSON, systemMetricsJSON, toolInfoJSON string err := rows.Scan( &entry.Service, &entry.Timestamp, &entry.Type, &entry.Host, &entry.Tool, &entry.LogLevel, &entry.LogMessage, &entry.Raw, &entry.Priority, &entry.PriorityName, &entry.Unit, &entry.PID, &entry.BootID, &entry.MachineID, &fieldsJSON, &serviceInfoJSON, &systemMetricsJSON, &toolInfoJSON, ) if err != nil { return nil, fmt.Errorf("failed to scan row: %w", err) } if fieldsJSON != "" && fieldsJSON != "null" { json.Unmarshal([]byte(fieldsJSON), &entry.Fields) } if serviceInfoJSON != "" && serviceInfoJSON != "null" { json.Unmarshal([]byte(serviceInfoJSON), &entry.ServiceInformation) } if systemMetricsJSON != "" && systemMetricsJSON != "null" { json.Unmarshal([]byte(systemMetricsJSON), &entry.SystemMetrics) } if toolInfoJSON != "" && toolInfoJSON != "null" { json.Unmarshal([]byte(toolInfoJSON), &entry.ToolInformation) } entries = append(entries, entry) } return entries, rows.Err() } func (s *SQLiteStorage) MarkAsExported(ctx context.Context, ids []int64) error { if len(ids) == 0 { return nil } s.mu.RLock() defer s.mu.RUnlock() tx, err := s.db.BeginTx(ctx, nil) if err != nil { return err } placeholders := strings.Repeat("?,", len(ids)) placeholders = placeholders[:len(placeholders)-1] sqlQuery := fmt.Sprintf("UPDATE log_entries SET exported_at = CURRENT_TIMESTAMP WHERE id IN (%s)", placeholders) args := make([]any, len(ids)) for i, id := range ids { args[i] = id } _, err = tx.ExecContext(ctx, sqlQuery, args...) if err != nil { tx.Rollback() return err } return tx.Commit() } func (s *SQLiteStorage) GetUnexportedEntries(ctx context.Context, limit int) ([]models.LogMessage, error) { s.mu.RLock() defer s.mu.RUnlock() sqlQuery := `SELECT id, service, timestamp, type, host, tool, log_level, log_message, raw, priority, priority_name, unit, pid, boot_id, machine_id, fields, service_information, system_metrics, tool_information FROM log_entries WHERE exported_at IS NULL ORDER BY timestamp ASC` if limit > 0 { sqlQuery += fmt.Sprintf(" LIMIT %d", limit) } rows, err := s.db.QueryContext(ctx, sqlQuery) if err != nil { return nil, fmt.Errorf("failed to execute query: %w", err) } defer rows.Close() var entries []models.LogMessage for rows.Next() { var entry models.LogMessage var id int64 var fieldsJSON, serviceInfoJSON, systemMetricsJSON, toolInfoJSON string err := rows.Scan( &id, &entry.Service, &entry.Timestamp, &entry.Type, &entry.Host, &entry.Tool, &entry.LogLevel, &entry.LogMessage, &entry.Raw, &entry.Priority, &entry.PriorityName, &entry.Unit, &entry.PID, &entry.BootID, &entry.MachineID, &fieldsJSON, &serviceInfoJSON, &systemMetricsJSON, &toolInfoJSON, ) if err != nil { return nil, fmt.Errorf("failed to scan row: %w", err) } if entry.Fields == nil { entry.Fields = make(map[string]any) } entry.Fields["_internal_id"] = id if fieldsJSON != "" && fieldsJSON != "null" { json.Unmarshal([]byte(fieldsJSON), &entry.Fields) } if serviceInfoJSON != "" && serviceInfoJSON != "null" { json.Unmarshal([]byte(serviceInfoJSON), &entry.ServiceInformation) } if systemMetricsJSON != "" && systemMetricsJSON != "null" { json.Unmarshal([]byte(systemMetricsJSON), &entry.SystemMetrics) } if toolInfoJSON != "" && toolInfoJSON != "null" { json.Unmarshal([]byte(toolInfoJSON), &entry.ToolInformation) } entries = append(entries, entry) } return entries, rows.Err() } func (s *SQLiteStorage) Close() error { close(s.rotationStop) s.rotationWg.Wait() s.mu.Lock() defer s.mu.Unlock() return s.db.Close() }