feat: implement log-rotation for local sqlite storage
This commit is contained in:
parent
4d3782902a
commit
1f07632ae2
5 changed files with 322 additions and 28 deletions
53
config.go
53
config.go
|
|
@ -59,6 +59,7 @@ type ElasticsearchConfig struct {
|
|||
type LocalStorage struct {
|
||||
Enable bool `mapstructure:"enabled"`
|
||||
DBPath string `mapstructure:"db_path"`
|
||||
RotationConfig StorageRotationConfig `mapstructure:"rotation"`
|
||||
}
|
||||
|
||||
type SystemMetrics struct {
|
||||
|
|
@ -97,6 +98,33 @@ type Config struct {
|
|||
} `mapstructure:"logging"`
|
||||
}
|
||||
|
||||
type StorageRotationConfig struct {
|
||||
// MaxSizeBytes is the maximum size of the database in bytes (0 = deactivated)
|
||||
MaxSizeBytes int64 `mapstructure:"max_size_bytes"`
|
||||
// MaxAgeHours is the maximum age of the database (0 = deaactivated)
|
||||
MaxAgeHours time.Duration `mapstructure:"max_age_hours"`
|
||||
// MaxFiles is the maximum count of old files, to keep
|
||||
MaxFiles int `mapstructure:"max_files"`
|
||||
// CheckIntervalMinutes is the intervall for checking rotation conditions
|
||||
CheckIntervalMinutes time.Duration `mapstructure:"check_interval_minutes"`
|
||||
// ArchiveDir is the dir to store archived files (empty = same dir as db)
|
||||
ArchiveDir string `mapstructure:"archive_dir"`
|
||||
}
|
||||
|
||||
func (src StorageRotationConfig) GetMaxAge() time.Duration {
|
||||
if src.MaxAgeHours <= 0 {
|
||||
return 0
|
||||
}
|
||||
return time.Duration(src.MaxAgeHours) * time.Hour
|
||||
}
|
||||
|
||||
func (src StorageRotationConfig) GetCheckInterval() time.Duration {
|
||||
if src.CheckIntervalMinutes <= 0 {
|
||||
return 5 * time.Minute
|
||||
}
|
||||
return time.Duration(src.CheckIntervalMinutes) * time.Minute
|
||||
}
|
||||
|
||||
func LoadConfig() (*Config, error) {
|
||||
viper.SetConfigName("config")
|
||||
viper.AddConfigPath(".")
|
||||
|
|
@ -157,6 +185,11 @@ func setConfigDefaultsV2() {
|
|||
viper.SetDefault("export.health_check_interval", "60s")
|
||||
viper.SetDefault("localstorage.enabled", true)
|
||||
viper.SetDefault("localstorage.db_path", "./tixel_watch.db")
|
||||
viper.SetDefault("localstorage.rotation.max_size_bytes", int64(100*1024*1024))
|
||||
viper.SetDefault("localstorage.rotation.max_age_hours", 24)
|
||||
viper.SetDefault("localstorage.rotation.max_files", 7)
|
||||
viper.SetDefault("localstorage.rotation.check_interval_minutes", 5)
|
||||
viper.SetDefault("localstorage.rotation.archive_dir", "")
|
||||
}
|
||||
|
||||
func validateConfig(cfg *Config) error {
|
||||
|
|
@ -253,5 +286,25 @@ func validateConfigV2(cfg *Config) error {
|
|||
}
|
||||
}
|
||||
|
||||
if cfg.LocalStorage.RotationConfig.MaxSizeBytes < 0 {
|
||||
slog.Warn("Invalid rotation max_size_bytes, setting to 100MB", "value", cfg.LocalStorage.RotationConfig.MaxSizeBytes)
|
||||
cfg.LocalStorage.RotationConfig.MaxSizeBytes = 100 * 1024 * 1024
|
||||
}
|
||||
|
||||
if cfg.LocalStorage.RotationConfig.MaxAgeHours < 0 {
|
||||
slog.Warn("Invalid rotation max_age_hours, setting to 24", "value", cfg.LocalStorage.RotationConfig.MaxAgeHours)
|
||||
cfg.LocalStorage.RotationConfig.MaxAgeHours = 24
|
||||
}
|
||||
|
||||
if cfg.LocalStorage.RotationConfig.MaxFiles < 0 {
|
||||
slog.Warn("Invalid rotation max_files, setting to 7", "value", cfg.LocalStorage.RotationConfig.MaxFiles)
|
||||
cfg.LocalStorage.RotationConfig.MaxFiles = 7
|
||||
}
|
||||
|
||||
if cfg.LocalStorage.RotationConfig.CheckIntervalMinutes < 1 {
|
||||
slog.Warn("Invalid rotation check_interval_minutes, setting to 5", "value", cfg.LocalStorage.RotationConfig.CheckIntervalMinutes)
|
||||
cfg.LocalStorage.RotationConfig.CheckIntervalMinutes = 5
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
|
|
|||
|
|
@ -5,7 +5,13 @@ import (
|
|||
"database/sql"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"log/slog"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"sort"
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
"tixel_watch/models"
|
||||
|
||||
_ "modernc.org/sqlite"
|
||||
|
|
@ -13,9 +19,32 @@ import (
|
|||
|
||||
type SQLiteStorage struct {
|
||||
db *sql.DB
|
||||
dbPath string
|
||||
rotationCfg StorageRotationConfig
|
||||
rotationStop chan struct{}
|
||||
rotationWg sync.WaitGroup
|
||||
mu sync.RWMutex
|
||||
}
|
||||
|
||||
func DefaultRotationConfig() StorageRotationConfig {
|
||||
return StorageRotationConfig{
|
||||
MaxSizeBytes: 100 * 1024 * 1024, // 100MB
|
||||
MaxAgeHours: 48 * time.Hour, // 48 hours
|
||||
MaxFiles: 3, // 3 old Files
|
||||
CheckIntervalMinutes: 5 * time.Minute, // check every 5 minutes
|
||||
ArchiveDir: "", // same directory
|
||||
}
|
||||
}
|
||||
|
||||
func NewSQLiteStorage(dbPath string) (*SQLiteStorage, error) {
|
||||
return NewSQLiteStorageWithRotation(dbPath, StorageRotationConfig{})
|
||||
}
|
||||
|
||||
func NewSQLiteStorageWithRotation(dbPath string, rotationCfg StorageRotationConfig) (*SQLiteStorage, error) {
|
||||
if rotationCfg.CheckIntervalMinutes == 0 {
|
||||
rotationCfg = DefaultRotationConfig()
|
||||
}
|
||||
|
||||
db, err := sql.Open("sqlite", dbPath)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to open SQLite database: %w", err)
|
||||
|
|
@ -29,7 +58,211 @@ func NewSQLiteStorage(dbPath string) (*SQLiteStorage, error) {
|
|||
return nil, fmt.Errorf("failed to create tables: %w", err)
|
||||
}
|
||||
|
||||
return &SQLiteStorage{db: db}, nil
|
||||
storage := &SQLiteStorage{
|
||||
db: db,
|
||||
dbPath: dbPath,
|
||||
rotationCfg: rotationCfg,
|
||||
rotationStop: make(chan struct{}),
|
||||
}
|
||||
|
||||
if rotationCfg.MaxSizeBytes > 0 || rotationCfg.MaxAgeHours > 0 {
|
||||
storage.rotationWg.Add(1)
|
||||
go storage.rotationWorker()
|
||||
slog.Info("Log rotation enabled",
|
||||
"maxSize", rotationCfg.MaxSizeBytes,
|
||||
"maxAge", rotationCfg.MaxAgeHours,
|
||||
"maxFiles", rotationCfg.MaxFiles)
|
||||
}
|
||||
|
||||
return storage, nil
|
||||
}
|
||||
|
||||
func (s *SQLiteStorage) rotationWorker() {
|
||||
defer s.rotationWg.Done()
|
||||
ticker := time.NewTicker(s.rotationCfg.CheckIntervalMinutes)
|
||||
defer ticker.Stop()
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-s.rotationStop:
|
||||
return
|
||||
case <-ticker.C:
|
||||
if err := s.checkAndRotate(); err != nil {
|
||||
slog.Error("Error during log rotation check", "error", err)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (s *SQLiteStorage) checkAndRotate() error {
|
||||
s.mu.Lock()
|
||||
defer s.mu.Unlock()
|
||||
|
||||
needsRotation, reason, err := s.needsRotation()
|
||||
if err != nil {
|
||||
return fmt.Errorf("error checking rotation needs: %w", err)
|
||||
}
|
||||
|
||||
if needsRotation {
|
||||
slog.Info("Starting log rotation", "reason", reason)
|
||||
if err := s.rotateDatabase(); err != nil {
|
||||
return fmt.Errorf("error rotating database: %w", err)
|
||||
}
|
||||
slog.Info("Log rotation completed successfully")
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *SQLiteStorage) needsRotation() (bool, string, error) {
|
||||
if s.rotationCfg.MaxSizeBytes > 0 {
|
||||
fileInfo, err := os.Stat(s.dbPath)
|
||||
if err != nil {
|
||||
return false, "", err
|
||||
}
|
||||
if fileInfo.Size() >= s.rotationCfg.MaxSizeBytes {
|
||||
return true, fmt.Sprintf("file size %d >= max size %d", fileInfo.Size(), s.rotationCfg.MaxSizeBytes), nil
|
||||
}
|
||||
}
|
||||
|
||||
if s.rotationCfg.MaxAgeHours > 0 {
|
||||
fileInfo, err := os.Stat(s.dbPath)
|
||||
if err != nil {
|
||||
return false, "", err
|
||||
}
|
||||
age := time.Since(fileInfo.ModTime())
|
||||
if age >= s.rotationCfg.MaxAgeHours {
|
||||
return true, fmt.Sprintf("file age %v >= max age %v", age, s.rotationCfg.MaxAgeHours), nil
|
||||
}
|
||||
}
|
||||
|
||||
return false, "", nil
|
||||
}
|
||||
|
||||
func (s *SQLiteStorage) rotateDatabase() error {
|
||||
if err := s.db.Close(); err != nil {
|
||||
return fmt.Errorf("error closing database: %w", err)
|
||||
}
|
||||
|
||||
archivePath := s.generateArchivePath()
|
||||
|
||||
if err := os.Rename(s.dbPath, archivePath); err != nil {
|
||||
return fmt.Errorf("error moving database to archive: %w", err)
|
||||
}
|
||||
|
||||
db, err := sql.Open("sqlite", s.dbPath)
|
||||
if err != nil {
|
||||
return fmt.Errorf("error opening new database: %w", err)
|
||||
}
|
||||
|
||||
if _, err := db.Exec("PRAGMA journal_mode=WAL"); err != nil {
|
||||
return fmt.Errorf("failed to enable WAL mode on new database: %w", err)
|
||||
}
|
||||
|
||||
if err := createTables(db); err != nil {
|
||||
return fmt.Errorf("failed to create tables in new database: %w", err)
|
||||
}
|
||||
|
||||
s.db = db
|
||||
|
||||
if err := s.cleanupOldArchives(); err != nil {
|
||||
slog.Warn("Error cleaning up old archives", "error", err)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *SQLiteStorage) generateArchivePath() string {
|
||||
dir := filepath.Dir(s.dbPath)
|
||||
if s.rotationCfg.ArchiveDir != "" {
|
||||
dir = s.rotationCfg.ArchiveDir
|
||||
os.MkdirAll(dir, 0755)
|
||||
}
|
||||
|
||||
base := filepath.Base(s.dbPath)
|
||||
ext := filepath.Ext(base)
|
||||
name := strings.TrimSuffix(base, ext)
|
||||
|
||||
timestamp := time.Now().Format("2006-01-02_15-04-05")
|
||||
archiveName := fmt.Sprintf("%s.%s%s", name, timestamp, ext)
|
||||
|
||||
return filepath.Join(dir, archiveName)
|
||||
}
|
||||
|
||||
func (s *SQLiteStorage) cleanupOldArchives() error {
|
||||
if s.rotationCfg.MaxFiles <= 0 {
|
||||
return nil
|
||||
}
|
||||
|
||||
dir := filepath.Dir(s.dbPath)
|
||||
if s.rotationCfg.ArchiveDir != "" {
|
||||
dir = s.rotationCfg.ArchiveDir
|
||||
}
|
||||
|
||||
base := filepath.Base(s.dbPath)
|
||||
ext := filepath.Ext(base)
|
||||
name := strings.TrimSuffix(base, ext)
|
||||
pattern := fmt.Sprintf("%s.*%s", name, ext)
|
||||
|
||||
files, err := filepath.Glob(filepath.Join(dir, pattern))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
var archives []string
|
||||
for _, file := range files {
|
||||
if file != s.dbPath {
|
||||
archives = append(archives, file)
|
||||
}
|
||||
}
|
||||
|
||||
sort.Slice(archives, func(i, j int) bool {
|
||||
infoI, _ := os.Stat(archives[i])
|
||||
infoJ, _ := os.Stat(archives[j])
|
||||
return infoI.ModTime().After(infoJ.ModTime())
|
||||
})
|
||||
|
||||
if len(archives) > s.rotationCfg.MaxFiles {
|
||||
for _, file := range archives[s.rotationCfg.MaxFiles:] {
|
||||
if err := os.Remove(file); err != nil {
|
||||
slog.Warn("Error removing old archive", "file", file, "error", err)
|
||||
} else {
|
||||
slog.Info("Removed old archive", "file", file)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *SQLiteStorage) ForceRotate() error {
|
||||
s.mu.Lock()
|
||||
defer s.mu.Unlock()
|
||||
|
||||
slog.Info("Forcing log rotation")
|
||||
return s.rotateDatabase()
|
||||
}
|
||||
|
||||
func (s *SQLiteStorage) GetRotationInfo() (map[string]any, error) {
|
||||
s.mu.RLock()
|
||||
defer s.mu.RUnlock()
|
||||
|
||||
fileInfo, err := os.Stat(s.dbPath)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
info := map[string]any{
|
||||
"currentSize": fileInfo.Size(),
|
||||
"maxSize": s.rotationCfg.MaxSizeBytes,
|
||||
"currentAge": time.Since(fileInfo.ModTime()).String(),
|
||||
"maxAge": s.rotationCfg.MaxAgeHours.String(),
|
||||
"maxFiles": s.rotationCfg.MaxFiles,
|
||||
"checkInterval": s.rotationCfg.CheckIntervalMinutes.String(),
|
||||
"archiveDir": s.rotationCfg.ArchiveDir,
|
||||
}
|
||||
|
||||
return info, nil
|
||||
}
|
||||
|
||||
func createTables(db *sql.DB) error {
|
||||
|
|
@ -90,6 +323,9 @@ func (s *SQLiteStorage) StoreBatch(ctx context.Context, entries []models.LogMess
|
|||
return nil
|
||||
}
|
||||
|
||||
s.mu.RLock()
|
||||
defer s.mu.RUnlock()
|
||||
|
||||
tx, err := s.db.BeginTx(ctx, nil)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to begin transaction: %w", err)
|
||||
|
|
@ -141,6 +377,9 @@ func (s *SQLiteStorage) StoreBatch(ctx context.Context, entries []models.LogMess
|
|||
}
|
||||
|
||||
func (s *SQLiteStorage) Query(ctx context.Context, query StorageQuery) ([]models.LogMessage, error) {
|
||||
s.mu.RLock()
|
||||
defer s.mu.RUnlock()
|
||||
|
||||
sqlQuery := "SELECT service, timestamp, type, host, tool, log_level, log_message, raw, priority, priority_name, unit, pid, boot_id, machine_id, fields, service_information, system_metrics, tool_information FROM log_entries WHERE 1=1"
|
||||
args := []any{}
|
||||
argCount := 0
|
||||
|
|
@ -252,6 +491,9 @@ func (s *SQLiteStorage) MarkAsExported(ctx context.Context, ids []int64) error {
|
|||
return nil
|
||||
}
|
||||
|
||||
s.mu.RLock()
|
||||
defer s.mu.RUnlock()
|
||||
|
||||
tx, err := s.db.BeginTx(ctx, nil)
|
||||
if err != nil {
|
||||
return err
|
||||
|
|
@ -273,15 +515,13 @@ func (s *SQLiteStorage) MarkAsExported(ctx context.Context, ids []int64) error {
|
|||
return err
|
||||
}
|
||||
|
||||
err = tx.Commit()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return err
|
||||
return tx.Commit()
|
||||
}
|
||||
|
||||
func (s *SQLiteStorage) GetUnexportedEntries(ctx context.Context, limit int) ([]models.LogMessage, error) {
|
||||
s.mu.RLock()
|
||||
defer s.mu.RUnlock()
|
||||
|
||||
sqlQuery := `SELECT id, service, timestamp, type, host, tool, log_level, log_message, raw, priority, priority_name,
|
||||
unit, pid, boot_id, machine_id, fields, service_information, system_metrics, tool_information
|
||||
FROM log_entries WHERE exported_at IS NULL ORDER BY timestamp ASC`
|
||||
|
|
@ -352,5 +592,11 @@ func (s *SQLiteStorage) GetUnexportedEntries(ctx context.Context, limit int) ([]
|
|||
}
|
||||
|
||||
func (s *SQLiteStorage) Close() error {
|
||||
close(s.rotationStop)
|
||||
s.rotationWg.Wait()
|
||||
|
||||
s.mu.Lock()
|
||||
defer s.mu.Unlock()
|
||||
|
||||
return s.db.Close()
|
||||
}
|
||||
|
|
|
|||
12
main.go
12
main.go
|
|
@ -31,14 +31,22 @@ func main() {
|
|||
|
||||
var storage StorageInterface
|
||||
if cfg.LocalStorage.Enable {
|
||||
sqliteStorage, err := NewSQLiteStorage(cfg.LocalStorage.DBPath)
|
||||
rotationConfig := StorageRotationConfig{
|
||||
MaxSizeBytes: cfg.LocalStorage.RotationConfig.MaxSizeBytes,
|
||||
MaxAgeHours: cfg.LocalStorage.RotationConfig.GetMaxAge(),
|
||||
MaxFiles: cfg.LocalStorage.RotationConfig.MaxFiles,
|
||||
CheckIntervalMinutes: cfg.LocalStorage.RotationConfig.GetCheckInterval(),
|
||||
ArchiveDir: cfg.LocalStorage.RotationConfig.ArchiveDir,
|
||||
}
|
||||
|
||||
sqliteStorage, err := NewSQLiteStorageWithRotation(cfg.LocalStorage.DBPath, rotationConfig)
|
||||
if err != nil {
|
||||
slog.Error("failed to initialize SQLite storage", "error", err)
|
||||
os.Exit(1)
|
||||
}
|
||||
storage = sqliteStorage
|
||||
defer storage.Close()
|
||||
slog.Info("SQLite storage initialized", "path", cfg.LocalStorage.DBPath)
|
||||
slog.Info("SQLite storage with rotation initialized", "path", cfg.LocalStorage.DBPath)
|
||||
} else {
|
||||
slog.Error("Local storage is disabled, but it's required for the new architecture")
|
||||
os.Exit(1)
|
||||
|
|
|
|||
|
|
@ -202,6 +202,9 @@ func (jep *JournalEntryParser) extractSystemdFields(journalData map[string]any,
|
|||
for _, field := range systemdFields {
|
||||
if value, ok := journalData[field]; ok {
|
||||
esFieldName := strings.ToLower(strings.TrimPrefix(field, "_"))
|
||||
if entry.Fields == nil {
|
||||
entry.Fields = make(map[string]any)
|
||||
}
|
||||
entry.Fields[esFieldName] = value
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -75,10 +75,6 @@ func (ws *WebServiceV2) Start(ctx context.Context) error {
|
|||
}
|
||||
|
||||
func (ws *WebServiceV2) handleHealth(w http.ResponseWriter, r *http.Request) {
|
||||
// if r.Method != http.MethodGet {
|
||||
// http.Error(w, "Method not allowed", http.StatusMethodNotAllowed)
|
||||
// return
|
||||
// }
|
||||
|
||||
status := map[string]any{
|
||||
"status": "healthy",
|
||||
|
|
@ -128,10 +124,6 @@ func (ws *WebServiceV2) handleHealth(w http.ResponseWriter, r *http.Request) {
|
|||
}
|
||||
|
||||
func (ws *WebServiceV2) handleLogs(w http.ResponseWriter, r *http.Request) {
|
||||
// if r.Method != http.MethodGet {
|
||||
// http.Error(w, "Method not allowed", http.StatusMethodNotAllowed)
|
||||
// return
|
||||
// }
|
||||
|
||||
query := ws.parseLogsQuery(r)
|
||||
|
||||
|
|
@ -155,10 +147,6 @@ func (ws *WebServiceV2) handleLogs(w http.ResponseWriter, r *http.Request) {
|
|||
}
|
||||
|
||||
func (ws *WebServiceV2) handleExport(w http.ResponseWriter, r *http.Request) {
|
||||
// if r.Method != http.MethodGet {
|
||||
// http.Error(w, "Method not allowed", http.StatusMethodNotAllowed)
|
||||
// return
|
||||
// }
|
||||
|
||||
query := ws.parseLogsQuery(r)
|
||||
|
||||
|
|
@ -194,10 +182,6 @@ func (ws *WebServiceV2) handleExport(w http.ResponseWriter, r *http.Request) {
|
|||
}
|
||||
|
||||
func (ws *WebServiceV2) handleStats(w http.ResponseWriter, r *http.Request) {
|
||||
// if r.Method != http.MethodGet {
|
||||
// http.Error(w, "Method not allowed", http.StatusMethodNotAllowed)
|
||||
// return
|
||||
// }
|
||||
|
||||
ctx, cancel := context.WithTimeout(r.Context(), 10*time.Second)
|
||||
defer cancel()
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue