watch-tool/local_storageV2.go

355 lines
8.8 KiB
Go

package main
import (
"context"
"database/sql"
"encoding/json"
"fmt"
"strings"
_ "modernc.org/sqlite"
)
type SQLiteStorage struct {
db *sql.DB
}
func NewSQLiteStorage(dbPath string) (*SQLiteStorage, error) {
db, err := sql.Open("sqlite", dbPath)
if err != nil {
return nil, fmt.Errorf("failed to open SQLite database: %w", err)
}
if _, err := db.Exec("PRAGMA journal_mode=WAL"); err != nil {
return nil, fmt.Errorf("failed to enable WAL mode: %w", err)
}
if err := createTables(db); err != nil {
return nil, fmt.Errorf("failed to create tables: %w", err)
}
return &SQLiteStorage{db: db}, nil
}
func createTables(db *sql.DB) error {
createTableStmt := `
CREATE TABLE IF NOT EXISTS log_entries (
id INTEGER PRIMARY KEY AUTOINCREMENT,
service TEXT,
timestamp DATETIME NOT NULL,
type TEXT NOT NULL,
host TEXT NOT NULL,
tool TEXT,
log_level TEXT,
log_message TEXT,
raw TEXT,
priority TEXT,
priority_name TEXT,
unit TEXT,
pid INTEGER,
boot_id TEXT,
machine_id TEXT,
fields TEXT,
service_information TEXT,
system_metrics TEXT,
tool_information TEXT,
created_at DATETIME DEFAULT CURRENT_TIMESTAMP,
exported_at DATETIME
);`
if _, err := db.Exec(createTableStmt); err != nil {
return err
}
indexes := []string{
"CREATE INDEX IF NOT EXISTS idx_timestamp ON log_entries(timestamp);",
"CREATE INDEX IF NOT EXISTS idx_service ON log_entries(service);",
"CREATE INDEX IF NOT EXISTS idx_type ON log_entries(type);",
"CREATE INDEX IF NOT EXISTS idx_tool ON log_entries(tool);",
"CREATE INDEX IF NOT EXISTS idx_log_level ON log_entries(log_level);",
"CREATE INDEX IF NOT EXISTS idx_exported ON log_entries(exported_at);",
"CREATE INDEX IF NOT EXISTS idx_composite ON log_entries(timestamp, type, service);",
}
for _, index := range indexes {
if _, err := db.Exec(index); err != nil {
return fmt.Errorf("failed to create index: %w", err)
}
}
return nil
}
func (s *SQLiteStorage) Store(ctx context.Context, entry *LogEntry) error {
return s.StoreBatch(ctx, []LogEntry{*entry})
}
func (s *SQLiteStorage) StoreBatch(ctx context.Context, entries []LogEntry) error {
if len(entries) == 0 {
return nil
}
tx, err := s.db.BeginTx(ctx, nil)
if err != nil {
return fmt.Errorf("failed to begin transaction: %w", err)
}
defer tx.Rollback()
stmt, err := tx.PrepareContext(ctx, `
INSERT INTO log_entries
(service, timestamp, type, host, tool, log_level, log_message, raw, priority, priority_name,
unit, pid, boot_id, machine_id, fields, service_information, system_metrics, tool_information)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)`)
if err != nil {
return fmt.Errorf("failed to prepare statement: %w", err)
}
defer stmt.Close()
for _, entry := range entries {
fieldsJSON, _ := json.Marshal(entry.Fields)
serviceInfoJSON, _ := json.Marshal(entry.ServiceInformation)
systemMetricsJSON, _ := json.Marshal(entry.SystemMetrics)
toolInfoJSON, _ := json.Marshal(entry.ToolInformation)
_, err := stmt.ExecContext(ctx,
entry.Service,
entry.Timestamp,
entry.Type,
entry.Host,
entry.Tool,
entry.LogLevel,
entry.LogMessage,
entry.Raw,
entry.Priority,
entry.PriorityName,
entry.Unit,
entry.PID,
entry.BootID,
entry.MachineID,
string(fieldsJSON),
string(serviceInfoJSON),
string(systemMetricsJSON),
string(toolInfoJSON),
)
if err != nil {
return fmt.Errorf("failed to insert entry: %w", err)
}
}
return tx.Commit()
}
func (s *SQLiteStorage) Query(ctx context.Context, query StorageQuery) ([]LogEntry, error) {
sqlQuery := "SELECT service, timestamp, type, host, tool, log_level, log_message, raw, priority, priority_name, unit, pid, boot_id, machine_id, fields, service_information, system_metrics, tool_information FROM log_entries WHERE 1=1"
args := []any{}
argCount := 0
if !query.StartTime.IsZero() {
argCount++
sqlQuery += fmt.Sprintf(" AND timestamp >= ?%d", argCount)
args = append(args, query.StartTime)
}
if !query.EndTime.IsZero() {
argCount++
sqlQuery += fmt.Sprintf(" AND timestamp <= ?%d", argCount)
args = append(args, query.EndTime)
}
if query.Service != "" {
argCount++
sqlQuery += fmt.Sprintf(" AND service = ?%d", argCount)
args = append(args, query.Service)
}
if query.Tool != "" {
argCount++
sqlQuery += fmt.Sprintf(" AND tool = ?%d", argCount)
args = append(args, query.Tool)
}
if query.LogLevel != "" {
argCount++
sqlQuery += fmt.Sprintf(" AND log_level = ?%d", argCount)
args = append(args, query.LogLevel)
}
if query.Type != "" {
argCount++
sqlQuery += fmt.Sprintf(" AND type = ?%d", argCount)
args = append(args, query.Type)
}
if query.OrderBy != "" {
direction := "ASC"
if query.OrderDesc {
direction = "DESC"
}
sqlQuery += fmt.Sprintf(" ORDER BY %s %s", query.OrderBy, direction)
} else {
sqlQuery += " ORDER BY timestamp DESC"
}
if query.Limit > 0 {
sqlQuery += fmt.Sprintf(" LIMIT %d", query.Limit)
if query.Offset > 0 {
sqlQuery += fmt.Sprintf(" OFFSET %d", query.Offset)
}
}
rows, err := s.db.QueryContext(ctx, sqlQuery, args...)
if err != nil {
return nil, fmt.Errorf("failed to execute query: %w", err)
}
defer rows.Close()
var entries []LogEntry
for rows.Next() {
var entry LogEntry
var fieldsJSON, serviceInfoJSON, systemMetricsJSON, toolInfoJSON string
err := rows.Scan(
&entry.Service,
&entry.Timestamp,
&entry.Type,
&entry.Host,
&entry.Tool,
&entry.LogLevel,
&entry.LogMessage,
&entry.Raw,
&entry.Priority,
&entry.PriorityName,
&entry.Unit,
&entry.PID,
&entry.BootID,
&entry.MachineID,
&fieldsJSON,
&serviceInfoJSON,
&systemMetricsJSON,
&toolInfoJSON,
)
if err != nil {
return nil, fmt.Errorf("failed to scan row: %w", err)
}
if fieldsJSON != "" && fieldsJSON != "null" {
json.Unmarshal([]byte(fieldsJSON), &entry.Fields)
}
if serviceInfoJSON != "" && serviceInfoJSON != "null" {
json.Unmarshal([]byte(serviceInfoJSON), &entry.ServiceInformation)
}
if systemMetricsJSON != "" && systemMetricsJSON != "null" {
json.Unmarshal([]byte(systemMetricsJSON), &entry.SystemMetrics)
}
if toolInfoJSON != "" && toolInfoJSON != "null" {
json.Unmarshal([]byte(toolInfoJSON), &entry.ToolInformation)
}
entries = append(entries, entry)
}
return entries, rows.Err()
}
func (s *SQLiteStorage) MarkAsExported(ctx context.Context, ids []int64) error {
if len(ids) == 0 {
return nil
}
tx, err := s.db.BeginTx(ctx, nil)
if err != nil {
return err
}
placeholders := strings.Repeat("?,", len(ids))
placeholders = placeholders[:len(placeholders)-1]
sqlQuery := fmt.Sprintf("UPDATE log_entries SET exported_at = CURRENT_TIMESTAMP WHERE id IN (%s)", placeholders)
args := make([]any, len(ids))
for i, id := range ids {
args[i] = id
}
_, err = tx.ExecContext(ctx, sqlQuery, args...)
if err != nil {
tx.Rollback()
return err
}
err = tx.Commit()
if err != nil {
return err
}
return err
}
func (s *SQLiteStorage) GetUnexportedEntries(ctx context.Context, limit int) ([]LogEntry, error) {
sqlQuery := `SELECT id, service, timestamp, type, host, tool, log_level, log_message, raw, priority, priority_name,
unit, pid, boot_id, machine_id, fields, service_information, system_metrics, tool_information
FROM log_entries WHERE exported_at IS NULL ORDER BY timestamp ASC`
if limit > 0 {
sqlQuery += fmt.Sprintf(" LIMIT %d", limit)
}
rows, err := s.db.QueryContext(ctx, sqlQuery)
if err != nil {
return nil, fmt.Errorf("failed to execute query: %w", err)
}
defer rows.Close()
var entries []LogEntry
for rows.Next() {
var entry LogEntry
var id int64
var fieldsJSON, serviceInfoJSON, systemMetricsJSON, toolInfoJSON string
err := rows.Scan(
&id,
&entry.Service,
&entry.Timestamp,
&entry.Type,
&entry.Host,
&entry.Tool,
&entry.LogLevel,
&entry.LogMessage,
&entry.Raw,
&entry.Priority,
&entry.PriorityName,
&entry.Unit,
&entry.PID,
&entry.BootID,
&entry.MachineID,
&fieldsJSON,
&serviceInfoJSON,
&systemMetricsJSON,
&toolInfoJSON,
)
if err != nil {
return nil, fmt.Errorf("failed to scan row: %w", err)
}
if entry.Fields == nil {
entry.Fields = make(map[string]any)
}
entry.Fields["_internal_id"] = id
if fieldsJSON != "" && fieldsJSON != "null" {
json.Unmarshal([]byte(fieldsJSON), &entry.Fields)
}
if serviceInfoJSON != "" && serviceInfoJSON != "null" {
json.Unmarshal([]byte(serviceInfoJSON), &entry.ServiceInformation)
}
if systemMetricsJSON != "" && systemMetricsJSON != "null" {
json.Unmarshal([]byte(systemMetricsJSON), &entry.SystemMetrics)
}
if toolInfoJSON != "" && toolInfoJSON != "null" {
json.Unmarshal([]byte(toolInfoJSON), &entry.ToolInformation)
}
entries = append(entries, entry)
}
return entries, rows.Err()
}
func (s *SQLiteStorage) Close() error {
return s.db.Close()
}