feat: implement new logic for local storage and exporters

This commit is contained in:
Patryk Hegenberg 2025-09-23 14:14:53 +02:00
parent 491eeaabd7
commit 366aac9edc
11 changed files with 1170 additions and 33 deletions

110
config.go
View file

@ -3,10 +3,21 @@ package main
import ( import (
"fmt" "fmt"
"log" "log"
"log/slog"
"time"
"github.com/spf13/viper" "github.com/spf13/viper"
) )
type ExportConfig struct {
Enabled bool `mapstructure:"enabled"`
BatchSize int `mapstructure:"batch_size"`
ExportInterval time.Duration `mapstructure:"export_interval"`
RetryAttempts int `mapstructure:"retry_attempts"`
RetryBackoff time.Duration `mapstructure:"retry_backoff"`
HealthCheckInterval time.Duration `mapstructure:"health_check_interval"`
}
type WebConfig struct { type WebConfig struct {
Enabled bool `mapstructure:"enabled"` Enabled bool `mapstructure:"enabled"`
Port int `mapstructure:"port"` Port int `mapstructure:"port"`
@ -41,6 +52,7 @@ type ElasticsearchConfig struct {
Index string `mapstructure:"index"` Index string `mapstructure:"index"`
Username string `mapstructure:"username"` Username string `mapstructure:"username"`
Password string `mapstructure:"password"` Password string `mapstructure:"password"`
APIKey string `mapstructure:"api_key"`
Timeout int `mapstructure:"timeout"` Timeout int `mapstructure:"timeout"`
} }
@ -73,6 +85,7 @@ type SystemMetrics struct {
type Config struct { type Config struct {
Elasticsearch ElasticsearchConfig `mapstructure:"elasticsearch"` Elasticsearch ElasticsearchConfig `mapstructure:"elasticsearch"`
LocalStorage LocalStorage `mapstrucutre:"localstorage"` LocalStorage LocalStorage `mapstrucutre:"localstorage"`
Export ExportConfig `mapstructure:"export"`
Tools []ToolConfig `mapstructure:"tools"` Tools []ToolConfig `mapstructure:"tools"`
Services []ServiceConfig `mapstructure:"services"` Services []ServiceConfig `mapstructure:"services"`
PollIntervalSeconds int `mapstructure:"poll_interval_seconds"` PollIntervalSeconds int `mapstructure:"poll_interval_seconds"`
@ -123,6 +136,29 @@ func setConfigDefaults() {
viper.SetDefault("logging.level", "info") viper.SetDefault("logging.level", "info")
} }
func setConfigDefaultsV2() {
viper.SetDefault("poll_interval_seconds", 30)
viper.SetDefault("elasticsearch.timeout", 30)
viper.SetDefault("system_metrics.enabled", true)
viper.SetDefault("system_metrics.collect_cpu", true)
viper.SetDefault("system_metrics.collect_memory", true)
viper.SetDefault("system_metrics.collect_disk", true)
viper.SetDefault("system_metrics.collect_network", false)
viper.SetDefault("system_metrics.disk_paths", []string{"/"})
viper.SetDefault("web_service.enabled", false)
viper.SetDefault("web_service.port", 8080)
viper.SetDefault("web_service.host", "localhost")
viper.SetDefault("logging.level", "info")
viper.SetDefault("export.enabled", true)
viper.SetDefault("export.batch_size", 100)
viper.SetDefault("export.export_interval", "30s")
viper.SetDefault("export.retry_attempts", 3)
viper.SetDefault("export.retry_backoff", "5s")
viper.SetDefault("export.health_check_interval", "60s")
viper.SetDefault("localstorage.enabled", true)
viper.SetDefault("localstorage.db_path", "./tixel_watch.db")
}
func validateConfig(cfg *Config) error { func validateConfig(cfg *Config) error {
if cfg.Elasticsearch.URL == "" { if cfg.Elasticsearch.URL == "" {
return fmt.Errorf("elasticsearch.url is required") return fmt.Errorf("elasticsearch.url is required")
@ -145,3 +181,77 @@ func validateConfig(cfg *Config) error {
return nil return nil
} }
func LoadConfigV2() (*Config, error) {
viper.SetConfigName("config")
viper.AddConfigPath(".")
viper.AddConfigPath("/opt/tixel/tixel-watch/")
viper.SetConfigType("yaml")
setConfigDefaultsV2()
if err := viper.ReadInConfig(); err != nil {
return nil, fmt.Errorf("error reading config: %w", err)
}
var cfg Config
if err := viper.Unmarshal(&cfg); err != nil {
return nil, fmt.Errorf("error parsing config: %w", err)
}
if err := validateConfigV2(&cfg); err != nil {
return nil, fmt.Errorf("config validation failed: %w", err)
}
return &cfg, nil
}
func validateConfigV2(cfg *Config) error {
if !cfg.LocalStorage.Enable {
return fmt.Errorf("local storage must be enabled in the new architecture")
}
if cfg.LocalStorage.DBPath == "" {
return fmt.Errorf("local storage db_path is required")
}
if cfg.Export.Enabled && cfg.Elasticsearch.Enabled {
if cfg.Elasticsearch.URL == "" {
return fmt.Errorf("elasticsearch.url is required when elasticsearch export is enabled")
}
if cfg.Elasticsearch.Index == "" {
return fmt.Errorf("elasticsearch.index is required when elasticsearch export is enabled")
}
}
if cfg.PollIntervalSeconds <= 0 {
slog.Warn("poll_interval_seconds is invalid, setting to 30", "value", cfg.PollIntervalSeconds)
cfg.PollIntervalSeconds = 30
}
if cfg.Export.Enabled {
if cfg.Export.BatchSize <= 0 {
cfg.Export.BatchSize = 100
}
if cfg.Export.ExportInterval <= 0 {
cfg.Export.ExportInterval = 30 * time.Second
}
if cfg.Export.RetryAttempts < 0 {
cfg.Export.RetryAttempts = 3
}
if cfg.Export.RetryBackoff <= 0 {
cfg.Export.RetryBackoff = 5 * time.Second
}
if cfg.Export.HealthCheckInterval <= 0 {
cfg.Export.HealthCheckInterval = 60 * time.Second
}
}
for i := range cfg.Tools {
if cfg.Tools[i].BufferSize <= 0 {
cfg.Tools[i].BufferSize = 100
}
}
return nil
}

View file

@ -20,6 +20,9 @@ func NewElasticsearchClient(config ElasticsearchConfig) (*elasticsearch.Client,
esConfig.Username = config.Username esConfig.Username = config.Username
esConfig.Password = config.Password esConfig.Password = config.Password
} }
if config.APIKey != "" {
esConfig.APIKey = config.APIKey
}
client, err := elasticsearch.NewClient(esConfig) client, err := elasticsearch.NewClient(esConfig)
if err != nil { if err != nil {

View file

@ -0,0 +1,90 @@
package main
import (
"context"
"encoding/json"
"fmt"
"log/slog"
"strings"
"time"
"github.com/elastic/go-elasticsearch/v8"
)
type ElasticsearchExporterV2 struct {
client *elasticsearch.Client
config ElasticsearchConfig
}
func NewElasticsearchExporterV2(config ElasticsearchConfig) (*ElasticsearchExporterV2, error) {
client, err := NewElasticsearchClient(config)
if err != nil {
return nil, fmt.Errorf("failed to create Elasticsearch client: %w", err)
}
return &ElasticsearchExporterV2{
client: client,
config: config,
}, nil
}
func (e *ElasticsearchExporterV2) Export(ctx context.Context, entries []LogEntry) error {
if len(entries) == 0 {
return nil
}
var body strings.Builder
for _, entry := range entries {
indexName := e.config.Index
indexLine := fmt.Sprintf(`{"index":{"_index":"%s"}}`, indexName)
body.WriteString(indexLine)
body.WriteString("\n")
data, err := json.Marshal(entry)
if err != nil {
slog.Error("error marshalling JSON", "error", err)
continue
}
body.WriteString(string(data))
body.WriteString("\n")
}
timeout := time.Duration(e.config.Timeout) * time.Second
ctx, cancel := context.WithTimeout(ctx, timeout)
defer cancel()
res, err := e.client.Bulk(
strings.NewReader(body.String()),
e.client.Bulk.WithContext(ctx),
)
if err != nil {
return fmt.Errorf("bulk request error: %w", err)
}
defer res.Body.Close()
if res.IsError() {
return fmt.Errorf("bulk request failed: %s", res.String())
}
slog.Debug("Batch successfully exported to Elasticsearch", "count", len(entries))
return nil
}
func (e *ElasticsearchExporterV2) HealthCheck(ctx context.Context) error {
timeout := time.Duration(e.config.Timeout) * time.Second
ctx, cancel := context.WithTimeout(ctx, timeout)
defer cancel()
res, err := e.client.Info(e.client.Info.WithContext(ctx))
if err != nil {
return fmt.Errorf("health check failed: %w", err)
}
defer res.Body.Close()
if res.IsError() {
return fmt.Errorf("health check failed: %s", res.String())
}
return nil
}

169
export_manager.go Normal file
View file

@ -0,0 +1,169 @@
package main
import (
"context"
"fmt"
"log/slog"
"maps"
"sync"
"time"
)
type ExportManager struct {
storage StorageInterface
exporters map[string]ExporterInterface
config ExportManagerConfig
mu sync.RWMutex
}
type ExportManagerConfig struct {
BatchSize int
ExportInterval time.Duration
RetryAttempts int
RetryBackoff time.Duration
HealthCheckInterval time.Duration
}
func NewExportManager(storage StorageInterface, config ExportManagerConfig) *ExportManager {
return &ExportManager{
storage: storage,
exporters: make(map[string]ExporterInterface),
config: config,
}
}
func (em *ExportManager) RegisterExporter(name string, exporter ExporterInterface) {
em.mu.Lock()
defer em.mu.Unlock()
em.exporters[name] = exporter
slog.Info("Exporter registered", "name", name)
}
func (em *ExportManager) UnregisterExporter(name string) {
em.mu.Lock()
defer em.mu.Unlock()
delete(em.exporters, name)
slog.Info("Exporter unregistered", "name", name)
}
func (em *ExportManager) Start(ctx context.Context) {
exportTicker := time.NewTicker(em.config.ExportInterval)
defer exportTicker.Stop()
healthTicker := time.NewTicker(em.config.HealthCheckInterval)
defer healthTicker.Stop()
slog.Info("Export manager started",
"batch_size", em.config.BatchSize,
"export_interval", em.config.ExportInterval,
)
for {
select {
case <-ctx.Done():
slog.Info("Export manager stopped")
return
case <-exportTicker.C:
em.exportBatch(ctx)
case <-healthTicker.C:
em.performHealthChecks(ctx)
}
}
}
func (em *ExportManager) exportBatch(ctx context.Context) {
entries, err := em.storage.(*SQLiteStorage).GetUnexportedEntries(ctx, em.config.BatchSize)
if err != nil {
slog.Error("Failed to get unexported entries", "error", err)
return
}
if len(entries) == 0 {
return
}
em.mu.RLock()
exporters := make(map[string]ExporterInterface)
maps.Copy(exporters, em.exporters)
em.mu.RUnlock()
successfulExports := make(map[string]bool)
for name, exporter := range exporters {
err := em.exportWithRetry(ctx, name, exporter, entries)
if err != nil {
slog.Error("Failed to export to target", "target", name, "error", err)
successfulExports[name] = false
} else {
slog.Debug("Successfully exported batch", "target", name, "count", len(entries))
successfulExports[name] = true
}
}
hasSuccess := false
for _, success := range successfulExports {
if success {
hasSuccess = true
break
}
}
if hasSuccess {
ids := make([]int64, len(entries))
for i, entry := range entries {
if id, ok := entry.Fields["_internal_id"].(int64); ok {
ids[i] = id
}
}
if err := em.storage.(*SQLiteStorage).MarkAsExported(ctx, ids); err != nil {
slog.Error("Failed to mark entries as exported", "error", err)
}
}
}
func (em *ExportManager) exportWithRetry(ctx context.Context, name string, exporter ExporterInterface, entries []LogEntry) error {
var lastErr error
for attempt := 0; attempt <= em.config.RetryAttempts; attempt++ {
if attempt > 0 {
backoff := time.Duration(attempt) * em.config.RetryBackoff
slog.Debug("Retrying export", "target", name, "attempt", attempt, "backoff", backoff)
select {
case <-ctx.Done():
return ctx.Err()
case <-time.After(backoff):
}
}
err := exporter.Export(ctx, entries)
if err == nil {
if attempt > 0 {
slog.Info("Export succeeded after retry", "target", name, "attempt", attempt)
}
return nil
}
lastErr = err
slog.Warn("Export attempt failed", "target", name, "attempt", attempt, "error", err)
}
return fmt.Errorf("export failed after %d attempts: %w", em.config.RetryAttempts, lastErr)
}
func (em *ExportManager) performHealthChecks(ctx context.Context) {
em.mu.RLock()
exporters := make(map[string]ExporterInterface)
maps.Copy(exporters, em.exporters)
em.mu.RUnlock()
for name, exporter := range exporters {
err := exporter.HealthCheck(ctx)
if err != nil {
slog.Warn("Health check failed", "target", name, "error", err)
} else {
slog.Debug("Health check passed", "target", name)
}
}
}

345
local_storageV2.go Normal file
View file

@ -0,0 +1,345 @@
package main
import (
"context"
"database/sql"
"encoding/json"
"fmt"
"strings"
_ "modernc.org/sqlite"
)
type SQLiteStorage struct {
db *sql.DB
}
func NewSQLiteStorage(dbPath string) (*SQLiteStorage, error) {
db, err := sql.Open("sqlite", dbPath)
if err != nil {
return nil, fmt.Errorf("failed to open SQLite database: %w", err)
}
if _, err := db.Exec("PRAGMA journal_mode=WAL"); err != nil {
return nil, fmt.Errorf("failed to enable WAL mode: %w", err)
}
if err := createTables(db); err != nil {
return nil, fmt.Errorf("failed to create tables: %w", err)
}
return &SQLiteStorage{db: db}, nil
}
func createTables(db *sql.DB) error {
createTableStmt := `
CREATE TABLE IF NOT EXISTS log_entries (
id INTEGER PRIMARY KEY AUTOINCREMENT,
service TEXT,
timestamp DATETIME NOT NULL,
type TEXT NOT NULL,
host TEXT NOT NULL,
tool TEXT,
log_level TEXT,
log_message TEXT,
raw TEXT,
priority TEXT,
priority_name TEXT,
unit TEXT,
pid INTEGER,
boot_id TEXT,
machine_id TEXT,
fields TEXT,
service_information TEXT,
system_metrics TEXT,
tool_information TEXT,
created_at DATETIME DEFAULT CURRENT_TIMESTAMP,
exported_at DATETIME
);`
if _, err := db.Exec(createTableStmt); err != nil {
return err
}
indexes := []string{
"CREATE INDEX IF NOT EXISTS idx_timestamp ON log_entries(timestamp);",
"CREATE INDEX IF NOT EXISTS idx_service ON log_entries(service);",
"CREATE INDEX IF NOT EXISTS idx_type ON log_entries(type);",
"CREATE INDEX IF NOT EXISTS idx_tool ON log_entries(tool);",
"CREATE INDEX IF NOT EXISTS idx_log_level ON log_entries(log_level);",
"CREATE INDEX IF NOT EXISTS idx_exported ON log_entries(exported_at);",
"CREATE INDEX IF NOT EXISTS idx_composite ON log_entries(timestamp, type, service);",
}
for _, index := range indexes {
if _, err := db.Exec(index); err != nil {
return fmt.Errorf("failed to create index: %w", err)
}
}
return nil
}
func (s *SQLiteStorage) Store(ctx context.Context, entry *LogEntry) error {
return s.StoreBatch(ctx, []LogEntry{*entry})
}
func (s *SQLiteStorage) StoreBatch(ctx context.Context, entries []LogEntry) error {
if len(entries) == 0 {
return nil
}
tx, err := s.db.BeginTx(ctx, nil)
if err != nil {
return fmt.Errorf("failed to begin transaction: %w", err)
}
defer tx.Rollback()
stmt, err := tx.PrepareContext(ctx, `
INSERT INTO log_entries
(service, timestamp, type, host, tool, log_level, log_message, raw, priority, priority_name,
unit, pid, boot_id, machine_id, fields, service_information, system_metrics, tool_information)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)`)
if err != nil {
return fmt.Errorf("failed to prepare statement: %w", err)
}
defer stmt.Close()
for _, entry := range entries {
fieldsJSON, _ := json.Marshal(entry.Fields)
serviceInfoJSON, _ := json.Marshal(entry.ServiceInformation)
systemMetricsJSON, _ := json.Marshal(entry.SystemMetrics)
toolInfoJSON, _ := json.Marshal(entry.ToolInformation)
_, err := stmt.ExecContext(ctx,
entry.Service,
entry.Timestamp,
entry.Type,
entry.Host,
entry.Tool,
entry.LogLevel,
entry.LogMessage,
entry.Raw,
entry.Priority,
entry.PriorityName,
entry.Unit,
entry.PID,
entry.BootID,
entry.MachineID,
string(fieldsJSON),
string(serviceInfoJSON),
string(systemMetricsJSON),
string(toolInfoJSON),
)
if err != nil {
return fmt.Errorf("failed to insert entry: %w", err)
}
}
return tx.Commit()
}
func (s *SQLiteStorage) Query(ctx context.Context, query StorageQuery) ([]LogEntry, error) {
sqlQuery := "SELECT service, timestamp, type, host, tool, log_level, log_message, raw, priority, priority_name, unit, pid, boot_id, machine_id, fields, service_information, system_metrics, tool_information FROM log_entries WHERE 1=1"
args := []interface{}{}
argCount := 0
if !query.StartTime.IsZero() {
argCount++
sqlQuery += fmt.Sprintf(" AND timestamp >= ?%d", argCount)
args = append(args, query.StartTime)
}
if !query.EndTime.IsZero() {
argCount++
sqlQuery += fmt.Sprintf(" AND timestamp <= ?%d", argCount)
args = append(args, query.EndTime)
}
if query.Service != "" {
argCount++
sqlQuery += fmt.Sprintf(" AND service = ?%d", argCount)
args = append(args, query.Service)
}
if query.Tool != "" {
argCount++
sqlQuery += fmt.Sprintf(" AND tool = ?%d", argCount)
args = append(args, query.Tool)
}
if query.LogLevel != "" {
argCount++
sqlQuery += fmt.Sprintf(" AND log_level = ?%d", argCount)
args = append(args, query.LogLevel)
}
if query.Type != "" {
argCount++
sqlQuery += fmt.Sprintf(" AND type = ?%d", argCount)
args = append(args, query.Type)
}
if query.OrderBy != "" {
direction := "ASC"
if query.OrderDesc {
direction = "DESC"
}
sqlQuery += fmt.Sprintf(" ORDER BY %s %s", query.OrderBy, direction)
} else {
sqlQuery += " ORDER BY timestamp DESC"
}
if query.Limit > 0 {
sqlQuery += fmt.Sprintf(" LIMIT %d", query.Limit)
if query.Offset > 0 {
sqlQuery += fmt.Sprintf(" OFFSET %d", query.Offset)
}
}
rows, err := s.db.QueryContext(ctx, sqlQuery, args...)
if err != nil {
return nil, fmt.Errorf("failed to execute query: %w", err)
}
defer rows.Close()
var entries []LogEntry
for rows.Next() {
var entry LogEntry
var fieldsJSON, serviceInfoJSON, systemMetricsJSON, toolInfoJSON string
err := rows.Scan(
&entry.Service,
&entry.Timestamp,
&entry.Type,
&entry.Host,
&entry.Tool,
&entry.LogLevel,
&entry.LogMessage,
&entry.Raw,
&entry.Priority,
&entry.PriorityName,
&entry.Unit,
&entry.PID,
&entry.BootID,
&entry.MachineID,
&fieldsJSON,
&serviceInfoJSON,
&systemMetricsJSON,
&toolInfoJSON,
)
if err != nil {
return nil, fmt.Errorf("failed to scan row: %w", err)
}
if fieldsJSON != "" && fieldsJSON != "null" {
json.Unmarshal([]byte(fieldsJSON), &entry.Fields)
}
if serviceInfoJSON != "" && serviceInfoJSON != "null" {
json.Unmarshal([]byte(serviceInfoJSON), &entry.ServiceInformation)
}
if systemMetricsJSON != "" && systemMetricsJSON != "null" {
json.Unmarshal([]byte(systemMetricsJSON), &entry.SystemMetrics)
}
if toolInfoJSON != "" && toolInfoJSON != "null" {
json.Unmarshal([]byte(toolInfoJSON), &entry.ToolInformation)
}
entries = append(entries, entry)
}
return entries, rows.Err()
}
func (s *SQLiteStorage) MarkAsExported(ctx context.Context, ids []int64) error {
if len(ids) == 0 {
return nil
}
placeholders := strings.Repeat("?,", len(ids))
placeholders = placeholders[:len(placeholders)-1] // Remove trailing comma
sqlQuery := fmt.Sprintf("UPDATE log_entries SET exported_at = CURRENT_TIMESTAMP WHERE id IN (%s)", placeholders)
args := make([]interface{}, len(ids))
for i, id := range ids {
args[i] = id
}
_, err := s.db.ExecContext(ctx, sqlQuery, args...)
return err
}
func (s *SQLiteStorage) GetUnexportedEntries(ctx context.Context, limit int) ([]LogEntry, error) {
// query := StorageQuery{
// Limit: limit,
// OrderBy: "timestamp",
// }
sqlQuery := `SELECT id, service, timestamp, type, host, tool, log_level, log_message, raw, priority, priority_name,
unit, pid, boot_id, machine_id, fields, service_information, system_metrics, tool_information
FROM log_entries WHERE exported_at IS NULL ORDER BY timestamp ASC`
if limit > 0 {
sqlQuery += fmt.Sprintf(" LIMIT %d", limit)
}
rows, err := s.db.QueryContext(ctx, sqlQuery)
if err != nil {
return nil, fmt.Errorf("failed to execute query: %w", err)
}
defer rows.Close()
var entries []LogEntry
for rows.Next() {
var entry LogEntry
var id int64
var fieldsJSON, serviceInfoJSON, systemMetricsJSON, toolInfoJSON string
err := rows.Scan(
&id,
&entry.Service,
&entry.Timestamp,
&entry.Type,
&entry.Host,
&entry.Tool,
&entry.LogLevel,
&entry.LogMessage,
&entry.Raw,
&entry.Priority,
&entry.PriorityName,
&entry.Unit,
&entry.PID,
&entry.BootID,
&entry.MachineID,
&fieldsJSON,
&serviceInfoJSON,
&systemMetricsJSON,
&toolInfoJSON,
)
if err != nil {
return nil, fmt.Errorf("failed to scan row: %w", err)
}
if entry.Fields == nil {
entry.Fields = make(map[string]any)
}
entry.Fields["_internal_id"] = id
if fieldsJSON != "" && fieldsJSON != "null" {
json.Unmarshal([]byte(fieldsJSON), &entry.Fields)
}
if serviceInfoJSON != "" && serviceInfoJSON != "null" {
json.Unmarshal([]byte(serviceInfoJSON), &entry.ServiceInformation)
}
if systemMetricsJSON != "" && systemMetricsJSON != "null" {
json.Unmarshal([]byte(systemMetricsJSON), &entry.SystemMetrics)
}
if toolInfoJSON != "" && toolInfoJSON != "null" {
json.Unmarshal([]byte(toolInfoJSON), &entry.ToolInformation)
}
entries = append(entries, entry)
}
return entries, rows.Err()
}
func (s *SQLiteStorage) Close() error {
return s.db.Close()
}

View file

@ -4,20 +4,16 @@ import (
"context" "context"
"log/slog" "log/slog"
"time" "time"
"github.com/elastic/go-elasticsearch/v8"
) )
type LogProcessor struct { type LogProcessor struct {
sender ElasticsearchSender storage StorageInterface
baseIndex string
batchSize int batchSize int
} }
func NewLogProcessor(es *elasticsearch.Client, baseIndex string) *LogProcessor { func NewLogProcessor(storage StorageInterface) *LogProcessor {
return &LogProcessor{ return &LogProcessor{
sender: NewElasticsearchSender(es), storage: storage,
baseIndex: baseIndex,
batchSize: 100, batchSize: 100,
} }
} }
@ -33,7 +29,7 @@ func (lp *LogProcessor) Start(ctx context.Context, logChan <-chan LogEntry) {
select { select {
case <-ctx.Done(): case <-ctx.Done():
if len(batch) > 0 { if len(batch) > 0 {
lp.sendBatch(batch) lp.storeBatch(ctx, batch)
} }
slog.Info("Log processor stopped") slog.Info("Log processor stopped")
return return
@ -41,7 +37,7 @@ func (lp *LogProcessor) Start(ctx context.Context, logChan <-chan LogEntry) {
case entry, ok := <-logChan: case entry, ok := <-logChan:
if !ok { if !ok {
if len(batch) > 0 { if len(batch) > 0 {
lp.sendBatch(batch) lp.storeBatch(ctx, batch)
} }
return return
} }
@ -49,30 +45,30 @@ func (lp *LogProcessor) Start(ctx context.Context, logChan <-chan LogEntry) {
batch = append(batch, entry) batch = append(batch, entry)
if len(batch) >= lp.batchSize { if len(batch) >= lp.batchSize {
lp.sendBatch(batch) lp.storeBatch(ctx, batch)
batch = batch[:0] batch = batch[:0]
} }
case <-ticker.C: case <-ticker.C:
if len(batch) > 0 { if len(batch) > 0 {
lp.sendBatch(batch) lp.storeBatch(ctx, batch)
batch = batch[:0] batch = batch[:0]
} }
} }
} }
} }
func (lp *LogProcessor) sendBatch(batch []LogEntry) { func (lp *LogProcessor) storeBatch(ctx context.Context, batch []LogEntry) {
if len(batch) == 0 { if len(batch) == 0 {
return return
} }
if err := lp.sender.SendBatch(lp.baseIndex, batch); err != nil { if err := lp.storage.StoreBatch(ctx, batch); err != nil {
slog.Error("error sending log batch", "error", err, "batch_size", len(batch)) slog.Error("error storing log batch", "error", err, "batch_size", len(batch))
return return
} }
slog.Debug("Log batch sent successfully", "batch_size", len(batch)) slog.Debug("Log batch stored successfully", "batch_size", len(batch))
} }
func (lp *LogProcessor) SetBatchSize(size int) { func (lp *LogProcessor) SetBatchSize(size int) {

79
main.go
View file

@ -21,25 +21,60 @@ func init() {
} }
func main() { func main() {
cfg, err := LoadConfig() cfg, err := LoadConfigV2()
if err != nil { if err != nil {
slog.Error("error loading configuration", "error", err) slog.Error("error loading configuration", "error", err)
os.Exit(1) os.Exit(1)
} }
slog.Info("TIXEL System Monitor started") slog.Info("TIXEL System Monitor started")
es, err := NewElasticsearchClient(cfg.Elasticsearch) var storage StorageInterface
if err != nil { if cfg.LocalStorage.Enable {
slog.Error("elasticsearch client error", "error", err) sqliteStorage, err := NewSQLiteStorage(cfg.LocalStorage.DBPath)
if err != nil {
slog.Error("failed to initialize SQLite storage", "error", err)
os.Exit(1)
}
storage = sqliteStorage
defer storage.Close()
slog.Info("SQLite storage initialized", "path", cfg.LocalStorage.DBPath)
} else {
slog.Error("Local storage is disabled, but it's required for the new architecture")
os.Exit(1) os.Exit(1)
} }
if err := TestElasticsearchConnection(es); err != nil { var exportManager *ExportManager
slog.Error("elasticsearch connection test failed", "error", err) if cfg.Export.Enabled {
os.Exit(1) exportConfig := ExportManagerConfig{
} BatchSize: cfg.Export.BatchSize,
ExportInterval: cfg.Export.ExportInterval,
RetryAttempts: cfg.Export.RetryAttempts,
RetryBackoff: cfg.Export.RetryBackoff,
HealthCheckInterval: cfg.Export.HealthCheckInterval,
}
slog.Info("Elasticsearch connection successful") exportManager = NewExportManager(storage, exportConfig)
if cfg.Elasticsearch.Enabled {
esExporter, err := NewElasticsearchExporterV2(cfg.Elasticsearch)
if err != nil {
slog.Error("failed to create Elasticsearch exporter", "error", err)
os.Exit(1)
}
if err := esExporter.HealthCheck(context.Background()); err != nil {
slog.Error("Elasticsearch health check failed", "error", err)
os.Exit(1)
}
exportManager.RegisterExporter("elasticsearch", esExporter)
slog.Info("Elasticsearch exporter registered")
}
// Add more exporters here in the future
// exportManager.RegisterExporter("checkmk", checkmkExporter)
// exportManager.RegisterExporter("grafana", grafanaExporter)
}
logChan := make(chan LogEntry, 1000) logChan := make(chan LogEntry, 1000)
ctx, cancel := context.WithCancel(context.Background()) ctx, cancel := context.WithCancel(context.Background())
@ -47,6 +82,21 @@ func main() {
var wg sync.WaitGroup var wg sync.WaitGroup
wg.Add(1)
go func() {
defer wg.Done()
processor := NewLogProcessor(storage)
processor.Start(ctx, logChan)
}()
if exportManager != nil {
wg.Add(1)
go func() {
defer wg.Done()
exportManager.Start(ctx)
}()
}
for _, service := range cfg.Services { for _, service := range cfg.Services {
if !service.Enabled { if !service.Enabled {
slog.Info("Service deactivated, skipping...", "service", service.Name) slog.Info("Service deactivated, skipping...", "service", service.Name)
@ -88,23 +138,16 @@ func main() {
go func() { go func() {
defer wg.Done() defer wg.Done()
collector := NewSystemMetricsCollector(cfg.SystemMetrics, cfg.PollIntervalSeconds) collector := NewSystemMetricsCollector(cfg.SystemMetrics, cfg.PollIntervalSeconds)
collector.Start(ctx, es, cfg.Elasticsearch.Index) collector.StartV2(ctx, storage, logChan)
}() }()
slog.Info("Started collecting System-Metrics") slog.Info("Started collecting System-Metrics")
} }
wg.Add(1)
go func() {
defer wg.Done()
processor := NewLogProcessor(es, cfg.Elasticsearch.Index)
processor.Start(ctx, logChan)
}()
if cfg.WebService.Enabled { if cfg.WebService.Enabled {
wg.Add(1) wg.Add(1)
go func() { go func() {
defer wg.Done() defer wg.Done()
webService := NewWebService(cfg, es) webService := NewWebServiceV2(cfg, storage)
if err := webService.Start(ctx); err != nil { if err := webService.Start(ctx); err != nil {
slog.Error("web service error", "error", err) slog.Error("web service error", "error", err)
} }

View file

@ -159,6 +159,9 @@ func (jep *JournalEntryParser) Parse(jsonLine string) (LogEntry, error) {
if machineID, ok := journalData["_MACHINE_ID"].(string); ok { if machineID, ok := journalData["_MACHINE_ID"].(string); ok {
entry.MachineID = machineID entry.MachineID = machineID
} }
if hostname, ok := journalData["_HOSTNAME"].(string); ok {
entry.Host = hostname
}
entry.Raw = jsonLine entry.Raw = jsonLine

31
storage_interface.go Normal file
View file

@ -0,0 +1,31 @@
package main
import (
"context"
"time"
)
type StorageInterface interface {
Store(ctx context.Context, entry *LogEntry) error
StoreBatch(ctx context.Context, entries []LogEntry) error
Query(ctx context.Context, query StorageQuery) ([]LogEntry, error)
Close() error
}
type StorageQuery struct {
StartTime time.Time
EndTime time.Time
Service string
Tool string
LogLevel string
Type string
Limit int
Offset int
OrderBy string
OrderDesc bool
}
type ExporterInterface interface {
Export(ctx context.Context, entries []LogEntry) error
HealthCheck(ctx context.Context) error
}

39
system_metricsV2.go Normal file
View file

@ -0,0 +1,39 @@
package main
import (
"context"
"log/slog"
"time"
)
func (smc *SystemMetricsCollector) StartV2(ctx context.Context, storage StorageInterface, logChan chan<- LogEntry) {
ticker := time.NewTicker(time.Duration(smc.pollInterval) * time.Second)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
slog.Info("System metrics collector stopped")
return
case <-ticker.C:
metrics, err := smc.collectMetrics()
if err != nil {
slog.Error("error collecting system metrics", "error", err)
continue
}
entry := NewLogEntry("system_metrics")
entry.Service = "system-metrics"
entry.LogLevel = "Info"
entry.SystemMetrics = metrics
select {
case logChan <- entry:
case <-ctx.Done():
return
default:
slog.Warn("Log channel is full, system metrics dropped")
}
}
}
}

308
web_serviceV2.go Normal file
View file

@ -0,0 +1,308 @@
package main
import (
"context"
"encoding/json"
"fmt"
"log/slog"
"net/http"
"os/exec"
"slices"
"strconv"
"strings"
"time"
)
type WebServiceV2 struct {
server *http.Server
storage StorageInterface
config *Config
}
func LoggingMiddleware(next http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
slog.Debug("WebService", "Remote-Address", r.RemoteAddr, "Method", r.Method, "Path", r.URL.Path)
next.ServeHTTP(w, r)
})
}
func NewWebServiceV2(config *Config, storage StorageInterface) *WebServiceV2 {
mux := http.NewServeMux()
ws := &WebServiceV2{
storage: storage,
config: config,
}
mux.HandleFunc("GET /health", ws.handleHealth)
mux.HandleFunc("GET /logs", ws.handleLogs)
mux.HandleFunc("GET /export", ws.handleExport)
mux.HandleFunc("GET /stats", ws.handleStats)
loggedMux := LoggingMiddleware(mux)
addr := fmt.Sprintf("%s:%d", config.WebService.Host, config.WebService.Port)
ws.server = &http.Server{
Addr: addr,
Handler: loggedMux,
ReadTimeout: 30 * time.Second,
WriteTimeout: 300 * time.Second,
IdleTimeout: 60 * time.Second,
}
return ws
}
func (ws *WebServiceV2) Start(ctx context.Context) error {
go func() {
<-ctx.Done()
shutdownCtx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
if err := ws.server.Shutdown(shutdownCtx); err != nil {
slog.Error("web service shutdown error", "error", err)
}
}()
slog.Info("Starting web service", "address", ws.server.Addr)
if err := ws.server.ListenAndServe(); err != http.ErrServerClosed {
return fmt.Errorf("web service error: %w", err)
}
return nil
}
func (ws *WebServiceV2) handleHealth(w http.ResponseWriter, r *http.Request) {
// if r.Method != http.MethodGet {
// http.Error(w, "Method not allowed", http.StatusMethodNotAllowed)
// return
// }
status := map[string]any{
"status": "healthy",
"timestamp": time.Now(),
"storage": "sqlite",
}
ctx, cancel := context.WithTimeout(r.Context(), 5*time.Second)
defer cancel()
_, err := ws.storage.Query(ctx, StorageQuery{
Limit: 1,
})
if err != nil {
status["storage_status"] = "unhealthy"
status["storage_error"] = err.Error()
w.WriteHeader(http.StatusServiceUnavailable)
} else {
status["storage_status"] = "healthy"
}
statusMap := make(map[string]any)
statusMap["tixel-watch"] = status
for _, service := range ws.config.Services {
statusCommand := []string{"sudo", "systemctl", "status", service.Name, "--no-pager"}
if service.Enabled {
serviceStatus, err := exec.Command(statusCommand[0], statusCommand[1:]...).Output()
if err != nil {
slog.Error("error executing status command", "error", err)
continue
}
lines := strings.SplitSeq(string(serviceStatus), "\n")
for line := range lines {
if strings.Contains(line, "Active:") {
serviceHealth, found := strings.CutPrefix(strings.TrimSpace(line), "Active:")
if found {
statusMap[service.Name] = map[string]any{"status": serviceHealth, "timestamp": time.Now()}
}
}
}
}
}
w.Header().Set("Content-Type", "application/json")
json.NewEncoder(w).Encode(statusMap)
}
func (ws *WebServiceV2) handleLogs(w http.ResponseWriter, r *http.Request) {
// if r.Method != http.MethodGet {
// http.Error(w, "Method not allowed", http.StatusMethodNotAllowed)
// return
// }
query := ws.parseLogsQuery(r)
ctx, cancel := context.WithTimeout(r.Context(), 30*time.Second)
defer cancel()
entries, err := ws.storage.Query(ctx, query)
if err != nil {
http.Error(w, fmt.Sprintf("Query error: %v", err), http.StatusInternalServerError)
return
}
response := map[string]any{
"entries": entries,
"count": len(entries),
"query": query,
}
w.Header().Set("Content-Type", "application/json")
json.NewEncoder(w).Encode(response)
}
func (ws *WebServiceV2) handleExport(w http.ResponseWriter, r *http.Request) {
// if r.Method != http.MethodGet {
// http.Error(w, "Method not allowed", http.StatusMethodNotAllowed)
// return
// }
query := ws.parseLogsQuery(r)
ctx, cancel := context.WithTimeout(r.Context(), 300*time.Second)
defer cancel()
entries, err := ws.storage.Query(ctx, query)
if err != nil {
http.Error(w, fmt.Sprintf("Export query error: %v", err), http.StatusInternalServerError)
return
}
filename := fmt.Sprintf("tixel_export_%s.json", time.Now().Format("20060102_150405"))
w.Header().Set("Content-Type", "application/json")
w.Header().Set("Content-Disposition", fmt.Sprintf("attachment; filename=%s", filename))
exportData := map[string]any{
"export_info": map[string]any{
"timestamp": time.Now(),
"entry_count": len(entries),
"query": query,
"exported_by": "tixel-watch",
},
"entries": entries,
}
if err := json.NewEncoder(w).Encode(exportData); err != nil {
slog.Error("Failed to encode export data", "error", err)
return
}
slog.Info("Data exported", "count", len(entries), "query", query)
}
func (ws *WebServiceV2) handleStats(w http.ResponseWriter, r *http.Request) {
// if r.Method != http.MethodGet {
// http.Error(w, "Method not allowed", http.StatusMethodNotAllowed)
// return
// }
ctx, cancel := context.WithTimeout(r.Context(), 10*time.Second)
defer cancel()
allEntries, err := ws.storage.Query(ctx, StorageQuery{})
if err != nil {
http.Error(w, fmt.Sprintf("Stats query error: %v", err), http.StatusInternalServerError)
return
}
recentEntries, err := ws.storage.Query(ctx, StorageQuery{
StartTime: time.Now().Add(-time.Hour),
})
if err != nil {
slog.Error("Failed to query recent entries", "error", err)
recentEntries = []LogEntry{}
}
stats := map[string]any{
"total_entries": len(allEntries),
"recent_entries": len(recentEntries),
"timestamp": time.Now(),
}
typeCounts := make(map[string]int)
serviceCounts := make(map[string]int)
toolCounts := make(map[string]int)
for _, entry := range allEntries {
typeCounts[entry.Type]++
if entry.Service != "" {
serviceCounts[entry.Service]++
}
if entry.Tool != "" {
toolCounts[entry.Tool]++
}
}
stats["by_type"] = typeCounts
stats["by_service"] = serviceCounts
stats["by_tool"] = toolCounts
w.Header().Set("Content-Type", "application/json")
json.NewEncoder(w).Encode(stats)
}
func (ws *WebServiceV2) parseLogsQuery(r *http.Request) StorageQuery {
query := StorageQuery{
Limit: 100,
OrderBy: "timestamp",
OrderDesc: true,
}
if limitStr := r.URL.Query().Get("limit"); limitStr != "" {
if limit, err := strconv.Atoi(limitStr); err == nil && limit > 0 {
if limit > 10000 {
limit = 10000
}
query.Limit = limit
}
}
if offsetStr := r.URL.Query().Get("offset"); offsetStr != "" {
if offset, err := strconv.Atoi(offsetStr); err == nil && offset >= 0 {
query.Offset = offset
}
}
if startTime := r.URL.Query().Get("start_time"); startTime != "" {
if t, err := time.Parse(time.RFC3339, startTime); err == nil {
query.StartTime = t
}
}
if endTime := r.URL.Query().Get("end_time"); endTime != "" {
if t, err := time.Parse(time.RFC3339, endTime); err == nil {
query.EndTime = t
}
}
if service := r.URL.Query().Get("service"); service != "" {
query.Service = strings.TrimSpace(service)
}
if tool := r.URL.Query().Get("tool"); tool != "" {
query.Tool = strings.TrimSpace(tool)
}
if logLevel := r.URL.Query().Get("log_level"); logLevel != "" {
query.LogLevel = strings.TrimSpace(logLevel)
}
if entryType := r.URL.Query().Get("type"); entryType != "" {
query.Type = strings.TrimSpace(entryType)
}
if orderBy := r.URL.Query().Get("order_by"); orderBy != "" {
validFields := []string{"timestamp", "service", "tool", "type", "log_level"}
if slices.Contains(validFields, orderBy) {
query.OrderBy = orderBy
}
}
if orderDesc := r.URL.Query().Get("order_desc"); orderDesc != "" {
query.OrderDesc = orderDesc == "true"
}
return query
}