Merge branch 'dev/improve-production-readyness-and-implement-new-generic-parser'

* dev/improve-production-readyness-and-implement-new-generic-parser:
  feat: delete old specific parsers and use new generic parser in file_monitor
  feat: implement new generic parser and improve production readyness
This commit is contained in:
Patryk Hegenberg 2026-01-18 17:01:02 +01:00
commit 1d1568e3ee
34 changed files with 1604 additions and 2577 deletions

View file

@ -2,8 +2,8 @@ package main
import ( import (
"fmt" "fmt"
"log"
"log/slog" "log/slog"
"regexp"
"time" "time"
"github.com/spf13/viper" "github.com/spf13/viper"
@ -96,6 +96,7 @@ type Config struct {
Level string `mapstructure:"level"` Level string `mapstructure:"level"`
FilePath string `mapstructure:"file_path"` FilePath string `mapstructure:"file_path"`
} `mapstructure:"logging"` } `mapstructure:"logging"`
PatternsFile string `mapstructure:"patterns_file"`
} }
type StorageRotationConfig struct { type StorageRotationConfig struct {
@ -125,30 +126,6 @@ func (src StorageRotationConfig) GetCheckInterval() time.Duration {
return time.Duration(src.CheckIntervalMinutes) * time.Minute return time.Duration(src.CheckIntervalMinutes) * time.Minute
} }
func LoadConfig() (*Config, error) {
viper.SetConfigName("config")
viper.AddConfigPath(".")
viper.AddConfigPath("/opt/tixel/tixel-watch/")
viper.SetConfigType("yaml")
setConfigDefaults()
if err := viper.ReadInConfig(); err != nil {
return nil, fmt.Errorf("error reading config: %w", err)
}
var cfg Config
if err := viper.Unmarshal(&cfg); err != nil {
return nil, fmt.Errorf("error parsing config: %w", err)
}
if err := validateConfig(&cfg); err != nil {
return nil, fmt.Errorf("config validation failed: %w", err)
}
return &cfg, nil
}
func setConfigDefaults() { func setConfigDefaults() {
viper.SetDefault("poll_interval_seconds", 30) viper.SetDefault("poll_interval_seconds", 30)
viper.SetDefault("elasticsearch.timeout", 30) viper.SetDefault("elasticsearch.timeout", 30)
@ -162,21 +139,6 @@ func setConfigDefaults() {
viper.SetDefault("web_service.port", 8080) viper.SetDefault("web_service.port", 8080)
viper.SetDefault("web_service.host", "localhost") viper.SetDefault("web_service.host", "localhost")
viper.SetDefault("logging.level", "info") viper.SetDefault("logging.level", "info")
}
func setConfigDefaultsV2() {
viper.SetDefault("poll_interval_seconds", 30)
viper.SetDefault("elasticsearch.timeout", 30)
viper.SetDefault("system_metrics.enabled", true)
viper.SetDefault("system_metrics.collect_cpu", true)
viper.SetDefault("system_metrics.collect_memory", true)
viper.SetDefault("system_metrics.collect_disk", true)
viper.SetDefault("system_metrics.collect_network", false)
viper.SetDefault("system_metrics.disk_paths", []string{"/"})
viper.SetDefault("web_service.enabled", false)
viper.SetDefault("web_service.port", 8080)
viper.SetDefault("web_service.host", "localhost")
viper.SetDefault("logging.level", "info")
viper.SetDefault("export.enabled", true) viper.SetDefault("export.enabled", true)
viper.SetDefault("export.batch_size", 100) viper.SetDefault("export.batch_size", 100)
viper.SetDefault("export.export_interval", "30s") viper.SetDefault("export.export_interval", "30s")
@ -190,38 +152,16 @@ func setConfigDefaultsV2() {
viper.SetDefault("localstorage.rotation.max_files", 7) viper.SetDefault("localstorage.rotation.max_files", 7)
viper.SetDefault("localstorage.rotation.check_interval_minutes", 5) viper.SetDefault("localstorage.rotation.check_interval_minutes", 5)
viper.SetDefault("localstorage.rotation.archive_dir", "") viper.SetDefault("localstorage.rotation.archive_dir", "")
viper.SetDefault("patterns_file", "./configs/patterns.yaml")
} }
func validateConfig(cfg *Config) error { func LoadConfig() (*Config, error) {
if cfg.Elasticsearch.URL == "" {
return fmt.Errorf("elasticsearch.url is required")
}
if cfg.Elasticsearch.Index == "" {
return fmt.Errorf("elasticsearch.index is required")
}
if cfg.PollIntervalSeconds <= 0 {
log.Printf("Warning: poll_interval_seconds is %d, setting to 30", cfg.PollIntervalSeconds)
cfg.PollIntervalSeconds = 30
}
for i := range cfg.Tools {
if cfg.Tools[i].BufferSize <= 0 {
cfg.Tools[i].BufferSize = 100
}
}
return nil
}
func LoadConfigV2() (*Config, error) {
viper.SetConfigName("config") viper.SetConfigName("config")
viper.AddConfigPath(".") viper.AddConfigPath(".")
viper.AddConfigPath("/opt/tixel/tixel-watch/") viper.AddConfigPath("/opt/tixel/tixel-watch/")
viper.SetConfigType("yaml") viper.SetConfigType("yaml")
setConfigDefaultsV2() setConfigDefaults()
if err := viper.ReadInConfig(); err != nil { if err := viper.ReadInConfig(); err != nil {
return nil, fmt.Errorf("error reading config: %w", err) return nil, fmt.Errorf("error reading config: %w", err)
@ -232,14 +172,14 @@ func LoadConfigV2() (*Config, error) {
return nil, fmt.Errorf("error parsing config: %w", err) return nil, fmt.Errorf("error parsing config: %w", err)
} }
if err := validateConfigV2(&cfg); err != nil { if err := validateConfig(&cfg); err != nil {
return nil, fmt.Errorf("config validation failed: %w", err) return nil, fmt.Errorf("config validation failed: %w", err)
} }
return &cfg, nil return &cfg, nil
} }
func validateConfigV2(cfg *Config) error { func validateConfig(cfg *Config) error {
if !cfg.LocalStorage.Enable { if !cfg.LocalStorage.Enable {
return fmt.Errorf("local storage must be enabled in the new architecture") return fmt.Errorf("local storage must be enabled in the new architecture")
} }
@ -280,9 +220,15 @@ func validateConfigV2(cfg *Config) error {
} }
} }
for i := range cfg.Tools { for _, tool := range cfg.Tools {
if cfg.Tools[i].BufferSize <= 0 { if tool.BufferSize <= 0 {
cfg.Tools[i].BufferSize = 100 tool.BufferSize = 100
}
if tool.Format.Pattern != "" {
if _, err := regexp.Compile(tool.Format.Pattern); err != nil {
return fmt.Errorf("invalid regex for tool '%s': %w", tool.Name, err)
}
} }
} }

164
configs/patterns.yml Normal file
View file

@ -0,0 +1,164 @@
patterns:
# ===========================================================================
# Common / Shared Patterns
# ===========================================================================
common:
extractors:
- name: "syslog_header"
regex: '^(\w{3} \d{2} \d{2}:\d{2}:\d{2}) (?P<hostname>[^\s]+) (?P<process_info>[^:]+):\s*(?P<message_rest>.*)$'
fields:
syslog_timestamp: "time:Jan 02 15:04:05"
hostname: "string"
process_info: "string"
message_rest: "string"
- name: "timestamp_rfc3339"
regex: '(?P<timestamp>\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}(?:\.\d+)?Z?)'
fields:
timestamp: "time:2006-01-02T15:04:05.000000Z"
# ===========================================================================
# TIXstream Service
# Deckt ab: tsServicePattern, tsTransferIDPattern, tsDetailPattern1-4
# ===========================================================================
tixstream:
extractors:
- name: "service_log_base"
regex: '^(?P<log_level>\S+)\s+(?P<timestamp>\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}\.\d{6})\s+(?P<message>.*)'
fields:
log_level: "string"
timestamp: "time:2006-01-02 15:04:05.000000"
message: "string"
- name: "transfer_id_extraction"
regex: '^(?P<transfer_id>\w{8}-\w{4}-\w{4}-\w{4}-\w{12})\s+(?P<message>.*)'
fields:
transfer_id: "string"
message: "string"
- name: "transfer_start_in"
regex: 'in: Transfer start (?P<thread_info>\d+/\d+) buffers=(?P<buffers>\d+) files=(?P<file_count>\d+) size=(?P<size_mb>[0-9.]+) MByte chunksize=(?P<chunk_size>\d+) streams=(?P<streams>\d+) target-datarate=(?P<target_rate>[0-9.]+) MByte/s protocol=(?P<protocol>\w+) dest=(?P<destination>\S+) sender-id=(?P<sender_id>\S+)'
fields:
thread_info: "string" # z.B. "1/4" - Typisierung hier schwierig, also String
buffers: "int"
file_count: "int"
size_mb: "float"
chunk_size: "int"
streams: "int"
target_rate: "float"
protocol: "string"
destination: "string"
sender_id: "string"
direction: "string" # Wir können statische Felder im Parser injecten oder hier als "implizit" betrachten
- name: "transfer_start_remote_out"
regex: 'out: Start remote transfer to (?P<target>[^\s]+) request executed, duration=(?P<duration>[0-9.]+) s'
fields:
target: "string"
duration: "float"
- name: "transfer_start_out"
regex: 'out: Transfer start (?P<thread_info>\d+/\d+) buffers=(?P<buffers>\d+) files=(?P<file_count>\d+) size=(?P<size_mb>[0-9.]+) MByte chunksize=(?P<chunk_size>\d+) streams=(?P<streams>\d+) target-datarate=(?P<target_rate>[0-9.]+) MByte/s protocol=(?P<protocol>\w+) src=(?P<source>\S+) receiver=(?P<receiver>\S+)'
fields:
thread_info: "string"
buffers: "int"
file_count: "int"
size_mb: "float"
chunk_size: "int"
streams: "int"
target_rate: "float"
protocol: "string"
source: "string"
receiver: "string"
- name: "transfer_start_generic"
regex: 'out: Start transfer (?P<thread_info>\d+/\d+), src=(?P<source>[^ ]*) dest=(?P<destination>[^ ]*) item\[0\]=(?P<item0>[^ ]*) count=(?P<count>\d+)'
fields:
thread_info: "string"
source: "string"
destination: "string"
item0: "string"
count: "int"
# ===========================================================================
# Transfer Job Manager (TJM)
# Deckt ab: tjmServicePattern, tjmTransferNamePattern, tjmTransferIDPattern1/2
# ===========================================================================
transfer-job-manager:
extractors:
- name: "service_log_base"
regex: '^(?P<timestamp>\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}\.\d{3})\s+(?P<log_level>\S+)\s+(?P<pid>\d+).*?\[(?P<correlation_id>[^\]]*)\]\s+\[(?P<username>[^\]]*)\]\s+\[(?P<thread_id>[^\]]*)\]\s+(?P<java_class>.*?)\s+:\s+(?P<message>.*)'
fields:
timestamp: "time:2006-01-02 15:04:05.000"
log_level: "string"
pid: "int"
correlation_id: "string"
username: "string"
thread_id: "string"
java_class: "string"
message: "string"
- name: "transfer_name_info"
regex: '^(?P<transfer_name_raw>\d{8}T\d{6}-[A-Za-z0-9]+-.+?-(?:in|out)) ?: (?P<message>.*)$'
fields:
transfer_name_raw: "string"
message: "string"
- name: "transfer_id_mid"
regex: '(?P<transfer_id>\w{8}-\w{4}-\w{4}-\w{4}-\w{12}).*?(?P<message>.*)'
fields:
transfer_id: "string"
message: "string"
- name: "transfer_id_prefixed"
regex: '(?P<prefix>.*)(?P<transfer_id>\w{8}-\w{4}-\w{4}-\w{4}-\w{12}).*?(?P<message>.*)'
fields:
prefix: "string"
transfer_id: "string"
message: "string"
# ===========================================================================
# Access Manager & TCC
# Deckt ab: amServicePattern, tccServicePattern
# ===========================================================================
access-manager:
extractors:
- name: "spring_boot_log"
regex: '^(?P<timestamp>\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}(?:\.\d+)?Z)\s+(?P<log_level>\w+)\s+(?P<pid>\d+)\s+---\s+\[\s*(?P<thread_id>[^\]]*)\]\s+(?P<logger>[\w\.]+)\s*:\s+(?P<message>.*)$'
fields:
timestamp: "time:2006-01-02T15:04:05.000000Z"
log_level: "string"
pid: "int"
thread_id: "string"
logger: "string"
message: "string"
tixel-control-center:
extractors:
- name: "spring_boot_log"
regex: '^(?P<timestamp>\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}(?:\.\d+)?Z)\s+(?P<log_level>\w+)\s+(?P<pid>\d+)\s+---\s+\[\s*(?P<thread_id>[^\]]*)\]\s+(?P<logger>[\w\.]+)\s*:\s+(?P<message>.*)$'
fields:
timestamp: "time:2006-01-02T15:04:05.000000Z"
log_level: "string"
pid: "int"
thread_id: "string"
logger: "string"
message: "string"
# ===========================================================================
# Nginx
# Deckt ab: nginxAccessPattern
# ===========================================================================
nginx:
extractors:
- name: "access_log"
regex: '^(?P<client_ip>\S+)\s+\S+\s+(?P<remote_user>\S+)\s+\[(?P<timestamp_nginx>[^\]]+)\]\s+"(?P<request>[^"]+)"\s+(?P<status_code>\d+)\s+(?P<bytes_sent>\d+|-)\s*(?:"(?P<referer>[^"]*)"\s+"(?P<user_agent>[^"]*)")?'
fields:
client_ip: "string"
remote_user: "string"
timestamp_nginx: "string"
request: "string"
status_code: "int"
bytes_sent: "int"
referer: "string"
user_agent: "string"

View file

@ -7,7 +7,7 @@ import (
"log/slog" "log/slog"
"strings" "strings"
"time" "time"
"tixel_watch/models" "watch-tool/models"
"github.com/elastic/go-elasticsearch/v8" "github.com/elastic/go-elasticsearch/v8"
) )

View file

@ -4,280 +4,87 @@ import (
"context" "context"
"encoding/json" "encoding/json"
"fmt" "fmt"
"io"
"log/slog" "log/slog"
"strings" "strings"
"time" "time"
"watch-tool/models"
"github.com/elastic/go-elasticsearch/v8" "github.com/elastic/go-elasticsearch/v8"
) )
type ElasticsearchExporter struct { type ElasticsearchExporter struct {
client *elasticsearch.Client client *elasticsearch.Client
config ElasticsearchConfig
} }
func NewElasticsearchExporter(client *elasticsearch.Client) *ElasticsearchExporter { func NewElasticsearchExporter(config ElasticsearchConfig) (*ElasticsearchExporter, error) {
client, err := NewElasticsearchClient(config)
if err != nil {
return nil, fmt.Errorf("failed to create Elasticsearch client: %w", err)
}
return &ElasticsearchExporter{ return &ElasticsearchExporter{
client: client, client: client,
} config: config,
}, nil
} }
type ExportResult struct { func (e *ElasticsearchExporter) Export(ctx context.Context, entries []models.LogMessage) error {
Index string `json:"index"` if len(entries) == 0 {
DocumentCount int `json:"document_count"` return nil
StartTime time.Time `json:"start_time"`
EndTime time.Time `json:"end_time"`
Duration string `json:"duration"`
Error string `json:"error,omitempty"`
}
func (e *ElasticsearchExporter) ExportToStream(ctx context.Context, indices []string, batchSize int, since int, writer io.Writer) error {
startTime := time.Now()
if _, err := writer.Write([]byte("{\n \"export_info\": {\n")); err != nil {
return fmt.Errorf("error writing export header: %w", err)
} }
exportInfo := map[string]any{ var body strings.Builder
"timestamp": startTime, for _, entry := range entries {
"indices": indices, indexName := e.config.Index
"batch_size": batchSize,
"sinceDays": since,
}
infoBytes, err := json.MarshalIndent(exportInfo, " ", " ") indexLine := fmt.Sprintf(`{"index":{"_index":"%s"}}`, indexName)
if err != nil { body.WriteString(indexLine)
return fmt.Errorf("error marshalling export info: %w", err) body.WriteString("\n")
}
infoStr := string(infoBytes) data, err := json.Marshal(entry)
infoStr = strings.TrimPrefix(infoStr, "{") if err != nil {
infoStr = strings.TrimSuffix(infoStr, "}") slog.Error("error marshalling JSON", "error", err)
continue
if _, err := writer.Write([]byte(infoStr)); err != nil {
return fmt.Errorf("error writing export info: %w", err)
}
if _, err := writer.Write([]byte("\n },\n \"data\": {\n")); err != nil {
return fmt.Errorf("error writing data header: %w", err)
}
results := make([]ExportResult, 0, len(indices))
first := true
for _, index := range indices {
if !first {
if _, err := writer.Write([]byte(",\n")); err != nil {
return fmt.Errorf("error writing separator: %w", err)
}
}
first = false
result := e.exportIndex(ctx, index, batchSize, since, writer)
results = append(results, result)
if result.Error != "" {
slog.Error("error exporting index", "index", index, "error", result.Error)
} }
body.WriteString(string(data))
body.WriteString("\n")
} }
if _, err := writer.Write([]byte("\n },\n \"results\": ")); err != nil { timeout := time.Duration(e.config.Timeout) * time.Second
return fmt.Errorf("error writing results header: %w", err) ctx, cancel := context.WithTimeout(ctx, timeout)
} defer cancel()
if err := json.NewEncoder(writer).Encode(results); err != nil { res, err := e.client.Bulk(
return fmt.Errorf("error writing results: %w", err) strings.NewReader(body.String()),
} e.client.Bulk.WithContext(ctx),
if _, err := writer.Write([]byte("}\n")); err != nil {
return fmt.Errorf("error writing final bracket: %w", err)
}
duration := time.Since(startTime)
slog.Info("Export completed", "duration", duration, "indices_count", len(indices))
return nil
}
func (e *ElasticsearchExporter) exportIndex(ctx context.Context, index string, batchSize int, since int, writer io.Writer) ExportResult {
startTime := time.Now()
result := ExportResult{
Index: index,
StartTime: startTime,
}
if _, err := fmt.Fprintf(writer, " \"%s\": [\n", index); err != nil {
result.Error = fmt.Sprintf("error writing index header: %v", err)
result.EndTime = time.Now()
result.Duration = time.Since(startTime).String()
return result
}
query := `{"query":{"match_all":{}}}`
if since > 0 {
query = fmt.Sprintf(`{
"query": {
"range": {
"timestamp": {
"gte": "now-%dd/d",
"lt": "now/d"
}
}
}
}`, since)
}
res, err := e.client.Search(
e.client.Search.WithContext(ctx),
e.client.Search.WithIndex(index),
e.client.Search.WithScroll(1000),
e.client.Search.WithSize(batchSize),
e.client.Search.WithBody(strings.NewReader(query)),
) )
if err != nil { if err != nil {
result.Error = fmt.Sprintf("error in initial search: %v", err) return fmt.Errorf("bulk request error: %w", err)
result.EndTime = time.Now()
result.Duration = time.Since(startTime).String()
return result
} }
defer res.Body.Close() defer res.Body.Close()
if res.IsError() { if res.IsError() {
result.Error = fmt.Sprintf("elasticsearch error: %s", res.String()) return fmt.Errorf("bulk request failed: %s", res.String())
result.EndTime = time.Now()
result.Duration = time.Since(startTime).String()
return result
} }
var searchResult map[string]any slog.Debug("Batch successfully exported to Elasticsearch", "count", len(entries))
if err := json.NewDecoder(res.Body).Decode(&searchResult); err != nil { return nil
result.Error = fmt.Sprintf("error decoding search result: %v", err)
result.EndTime = time.Now()
result.Duration = time.Since(startTime).String()
return result
}
scrollID, ok := searchResult["_scroll_id"].(string)
if !ok {
result.Error = "no scroll_id found in search result"
result.EndTime = time.Now()
result.Duration = time.Since(startTime).String()
return result
}
hits := searchResult["hits"].(map[string]any)["hits"].([]any)
firstDocument := true
documentCount := 0
for _, hit := range hits {
if !firstDocument {
if _, err := writer.Write([]byte(",\n")); err != nil {
result.Error = fmt.Sprintf("error writing separator: %v", err)
result.EndTime = time.Now()
result.Duration = time.Since(startTime).String()
return result
}
}
firstDocument = false
source := hit.(map[string]any)["_source"]
if err := e.writeDocument(writer, source); err != nil {
result.Error = fmt.Sprintf("error writing document: %v", err)
result.EndTime = time.Now()
result.Duration = time.Since(startTime).String()
return result
}
documentCount++
}
for {
select {
case <-ctx.Done():
result.Error = "context cancelled"
result.EndTime = time.Now()
result.Duration = time.Since(startTime).String()
return result
default:
}
scrollRes, err := e.client.Scroll(
e.client.Scroll.WithScrollID(scrollID),
e.client.Scroll.WithScroll(1000),
e.client.Scroll.WithContext(ctx),
)
if err != nil {
result.Error = fmt.Sprintf("error in scroll request: %v", err)
break
}
defer scrollRes.Body.Close()
if scrollRes.IsError() {
result.Error = fmt.Sprintf("elasticsearch scroll error: %s", scrollRes.String())
break
}
var scrollResult map[string]any
if err := json.NewDecoder(scrollRes.Body).Decode(&scrollResult); err != nil {
result.Error = fmt.Sprintf("error decoding scroll result: %v", err)
break
}
hits := scrollResult["hits"].(map[string]any)["hits"].([]any)
if len(hits) == 0 {
break
}
scrollID, _ = scrollResult["_scroll_id"].(string)
for _, hit := range hits {
if _, err := writer.Write([]byte(",\n")); err != nil {
result.Error = fmt.Sprintf("error writing separator: %v", err)
result.EndTime = time.Now()
result.Duration = time.Since(startTime).String()
return result
}
source := hit.(map[string]any)["_source"]
if err := e.writeDocument(writer, source); err != nil {
result.Error = fmt.Sprintf("error writing document: %v", err)
result.EndTime = time.Now()
result.Duration = time.Since(startTime).String()
return result
}
documentCount++
}
}
if _, err := writer.Write([]byte("\n ]")); err != nil {
if result.Error == "" {
result.Error = fmt.Sprintf("error writing index footer: %v", err)
}
}
result.DocumentCount = documentCount
result.EndTime = time.Now()
result.Duration = time.Since(startTime).String()
slog.Info("Index export completed",
"index", index,
"documents", documentCount,
"duration", result.Duration,
)
return result
} }
func (e *ElasticsearchExporter) writeDocument(writer io.Writer, document any) error { func (e *ElasticsearchExporter) HealthCheck(ctx context.Context) error {
jsonBytes, err := json.MarshalIndent(document, " ", " ") timeout := time.Duration(e.config.Timeout) * time.Second
ctx, cancel := context.WithTimeout(ctx, timeout)
defer cancel()
res, err := e.client.Info(e.client.Info.WithContext(ctx))
if err != nil { if err != nil {
return fmt.Errorf("error marshalling document: %w", err) return fmt.Errorf("health check failed: %w", err)
} }
defer res.Body.Close()
if _, err := writer.Write([]byte(" ")); err != nil { if res.IsError() {
return err return fmt.Errorf("health check failed: %s", res.String())
}
if _, err := writer.Write(jsonBytes); err != nil {
return err
} }
return nil return nil

View file

@ -1,91 +0,0 @@
package main
import (
"context"
"encoding/json"
"fmt"
"log/slog"
"strings"
"time"
"tixel_watch/models"
"github.com/elastic/go-elasticsearch/v8"
)
type ElasticsearchExporterV2 struct {
client *elasticsearch.Client
config ElasticsearchConfig
}
func NewElasticsearchExporterV2(config ElasticsearchConfig) (*ElasticsearchExporterV2, error) {
client, err := NewElasticsearchClient(config)
if err != nil {
return nil, fmt.Errorf("failed to create Elasticsearch client: %w", err)
}
return &ElasticsearchExporterV2{
client: client,
config: config,
}, nil
}
func (e *ElasticsearchExporterV2) Export(ctx context.Context, entries []models.LogMessage) error {
if len(entries) == 0 {
return nil
}
var body strings.Builder
for _, entry := range entries {
indexName := e.config.Index
indexLine := fmt.Sprintf(`{"index":{"_index":"%s"}}`, indexName)
body.WriteString(indexLine)
body.WriteString("\n")
data, err := json.Marshal(entry)
if err != nil {
slog.Error("error marshalling JSON", "error", err)
continue
}
body.WriteString(string(data))
body.WriteString("\n")
}
timeout := time.Duration(e.config.Timeout) * time.Second
ctx, cancel := context.WithTimeout(ctx, timeout)
defer cancel()
res, err := e.client.Bulk(
strings.NewReader(body.String()),
e.client.Bulk.WithContext(ctx),
)
if err != nil {
return fmt.Errorf("bulk request error: %w", err)
}
defer res.Body.Close()
if res.IsError() {
return fmt.Errorf("bulk request failed: %s", res.String())
}
slog.Debug("Batch successfully exported to Elasticsearch", "count", len(entries))
return nil
}
func (e *ElasticsearchExporterV2) HealthCheck(ctx context.Context) error {
timeout := time.Duration(e.config.Timeout) * time.Second
ctx, cancel := context.WithTimeout(ctx, timeout)
defer cancel()
res, err := e.client.Info(e.client.Info.WithContext(ctx))
if err != nil {
return fmt.Errorf("health check failed: %w", err)
}
defer res.Body.Close()
if res.IsError() {
return fmt.Errorf("health check failed: %s", res.String())
}
return nil
}

View file

@ -8,7 +8,7 @@ import (
"strings" "strings"
"sync" "sync"
"time" "time"
"tixel_watch/models" "watch-tool/models"
) )
type ExportManager struct { type ExportManager struct {

View file

@ -2,7 +2,7 @@ package main
import ( import (
"context" "context"
"tixel_watch/models" "watch-tool/models"
) )
type ExporterInterface interface { type ExporterInterface interface {

View file

@ -6,43 +6,52 @@ import (
"log/slog" "log/slog"
"regexp" "regexp"
"strings" "strings"
"tixel_watch/models" "watch-tool/models"
"tixel_watch/parser" "watch-tool/parser"
"watch-tool/patterns"
"github.com/hpcloud/tail" "github.com/hpcloud/tail"
) )
type FileMonitor struct { type FileMonitor struct {
config ToolConfig config ToolConfig
parser parser.Parser parser parser.Parser
hostname string
} }
func NewFileMonitor(config ToolConfig) *FileMonitor { func NewFileMonitor(config ToolConfig, hostname string) *FileMonitor {
var logParser parser.Parser var logParser parser.Parser
if config.Format.Pattern != "" { if config.Format.Pattern != "" {
pattern, err := regexp.Compile(config.Format.Pattern) compiledRegex, err := regexp.Compile(config.Format.Pattern)
if err != nil { if err != nil {
slog.Error("invalid regex pattern", "tool", config.Name, "error", err) slog.Error("Invalid regex pattern in tool config", "tool", config.Name, "error", err)
logParser = &parser.DefaultParser{} logParser = parser.NewGenericParser(config.Name, hostname)
} else { } else {
logParser = &parser.RegexLogParser{ gp := parser.NewGenericParser(config.Name, hostname)
Pattern: pattern,
Fields: config.Format.Fields, customExtractor := patterns.CompiledExtractor{
Toolname: config.Name, Name: "config_custom_pattern",
Pattern: compiledRegex,
Fields: config.Format.Fields,
} }
gp.Extractors = append(gp.Extractors, customExtractor)
logParser = gp
} }
} else { } else {
var err error var err error
logParser, err = parser.New(config.Name, "custom") logParser, err = parser.New(config.Name, "custom", hostname)
if err != nil { if err != nil {
slog.Error("cannot get tool specific parser", "error", err) slog.Error("Cannot get tool specific parser from factory", "error", err)
logParser = parser.NewGenericParser(config.Name, hostname)
} }
} }
return &FileMonitor{ return &FileMonitor{
config: config, config: config,
parser: logParser, parser: logParser,
hostname: hostname,
} }
} }
@ -72,7 +81,7 @@ func (fm *FileMonitor) Start(ctx context.Context, out chan<- models.LogMessage)
} }
if line.Err != nil { if line.Err != nil {
slog.Error("error reading log file", "tool", fm.config.Name, "error", line.Err) slog.Error("Error reading log file", "tool", fm.config.Name, "error", line.Err)
continue continue
} }
@ -82,7 +91,11 @@ func (fm *FileMonitor) Start(ctx context.Context, out chan<- models.LogMessage)
entry, err := fm.parser.Parse(line.Text) entry, err := fm.parser.Parse(line.Text)
if err != nil { if err != nil {
slog.Error("error parsing log line", "error", err) slog.Error("Error parsing log line", "tool", fm.config.Name, "error", err)
} else {
if entry.Tool == "" {
entry.Tool = fm.config.Name
}
} }
select { select {

4
go.mod
View file

@ -1,4 +1,4 @@
module tixel_watch module watch-tool
go 1.24.1 go 1.24.1
@ -8,6 +8,7 @@ require (
github.com/shirou/gopsutil v3.21.11+incompatible github.com/shirou/gopsutil v3.21.11+incompatible
github.com/spf13/viper v1.20.1 github.com/spf13/viper v1.20.1
golang.org/x/sys v0.34.0 golang.org/x/sys v0.34.0
gopkg.in/yaml.v3 v3.0.1
modernc.org/sqlite v1.39.0 modernc.org/sqlite v1.39.0
) )
@ -42,7 +43,6 @@ require (
golang.org/x/text v0.21.0 // indirect golang.org/x/text v0.21.0 // indirect
gopkg.in/fsnotify.v1 v1.4.7 // indirect gopkg.in/fsnotify.v1 v1.4.7 // indirect
gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7 // indirect gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
modernc.org/libc v1.66.3 // indirect modernc.org/libc v1.66.3 // indirect
modernc.org/mathutil v1.7.1 // indirect modernc.org/mathutil v1.7.1 // indirect
modernc.org/memory v1.11.0 // indirect modernc.org/memory v1.11.0 // indirect

View file

@ -3,11 +3,10 @@ package helpers
import ( import (
"fmt" "fmt"
"log/slog" "log/slog"
"os"
"regexp" "regexp"
"strings" "strings"
"time" "time"
"tixel_watch/models" "watch-tool/models"
) )
var ( var (
@ -76,11 +75,3 @@ func ParseSyslogTimeToRFC3339(syslogTime string) (time.Time, error) {
t = t.AddDate(now.Year(), 0, 0) t = t.AddDate(now.Year(), 0, 0)
return t, nil return t, nil
} }
func GetHostname() (string, error) {
hostname, err := os.Hostname()
if err != nil {
hostname = "unknown"
}
return hostname, nil
}

47
helpers/utils.go Normal file
View file

@ -0,0 +1,47 @@
package helpers
import (
"context"
"fmt"
"log/slog"
"runtime/debug"
)
type AppError struct {
Op string
Err error
Context string
}
func (e *AppError) Error() string {
if e.Context != "" {
return fmt.Sprintf("%s: %v (%s)", e.Op, e.Err, e.Context)
}
return fmt.Sprintf("%s: %v", e.Op, e.Err)
}
func (e *AppError) Unwrap() error {
return e.Err
}
func NewAppError(op string, err error, ctx string) error {
return &AppError{Op: op, Err: err, Context: ctx}
}
func SafeGo(ctx context.Context, name string, fn func()) {
go func() {
defer func() {
if r := recover(); r != nil {
stack := string(debug.Stack())
slog.Error("CRITICAL: Panic recovered in goroutine",
"goroutine", name,
"panic", r,
"stack", stack,
)
// Optional: Hier könnte man Metriken inkrementieren (siehe Observability)
}
}()
fn()
}()
}

View file

@ -4,28 +4,277 @@ import (
"context" "context"
"database/sql" "database/sql"
"encoding/json" "encoding/json"
"tixel_watch/models" "fmt"
"log/slog"
"os"
"path/filepath"
"sort"
"strings"
"sync"
"time"
"watch-tool/models"
_ "modernc.org/sqlite" _ "modernc.org/sqlite"
) )
type StorageService struct { type SQLiteStorage struct {
db *sql.DB db *sql.DB
dbPath string
rotationCfg StorageRotationConfig
rotationStop chan struct{}
rotationWg sync.WaitGroup
mu sync.RWMutex
} }
func NewStorageService(dbPath string) (*StorageService, error) { func DefaultRotationConfig() StorageRotationConfig {
db, err := sql.Open("sqlite", dbPath) return StorageRotationConfig{
MaxSizeBytes: 100 * 1024 * 1024, // 100MB
MaxAgeHours: 48 * time.Hour, // 48 hours
MaxFiles: 3, // 3 old Files
CheckIntervalMinutes: 5 * time.Minute, // check every 5 minutes
ArchiveDir: "", // same directory
}
}
func NewSQLiteStorage(dbPath string) (*SQLiteStorage, error) {
return NewSQLiteStorageWithRotation(dbPath, StorageRotationConfig{})
}
func NewSQLiteStorageWithRotation(dbPath string, rotationCfg StorageRotationConfig) (*SQLiteStorage, error) {
if rotationCfg.CheckIntervalMinutes == 0 {
rotationCfg = DefaultRotationConfig()
}
dsn := fmt.Sprintf("%s?_busy_timeout=5000&_journal_mode=WAL", dbPath)
db, err := sql.Open("sqlite", dsn)
if err != nil {
return nil, fmt.Errorf("failed to open SQLite database: %w", err)
}
if _, err := db.Exec("PRAGMA journal_mode=WAL"); err != nil {
return nil, fmt.Errorf("failed to enable WAL mode: %w", err)
}
if err := createTables(db); err != nil {
return nil, fmt.Errorf("failed to create tables: %w", err)
}
storage := &SQLiteStorage{
db: db,
dbPath: dbPath,
rotationCfg: rotationCfg,
rotationStop: make(chan struct{}),
}
if rotationCfg.MaxSizeBytes > 0 || rotationCfg.MaxAgeHours > 0 {
storage.rotationWg.Add(1)
go storage.rotationWorker()
slog.Info("Log rotation enabled",
"maxSize", rotationCfg.MaxSizeBytes,
"maxAge", rotationCfg.MaxAgeHours,
"maxFiles", rotationCfg.MaxFiles)
}
return storage, nil
}
func (s *SQLiteStorage) rotationWorker() {
defer s.rotationWg.Done()
ticker := time.NewTicker(s.rotationCfg.CheckIntervalMinutes)
defer ticker.Stop()
for {
select {
case <-s.rotationStop:
return
case <-ticker.C:
if err := s.checkAndRotate(); err != nil {
slog.Error("Error during log rotation check", "error", err)
}
}
}
}
func (s *SQLiteStorage) checkAndRotate() error {
s.mu.Lock()
defer s.mu.Unlock()
needsRotation, reason, err := s.needsRotation()
if err != nil {
return fmt.Errorf("error checking rotation needs: %w", err)
}
if needsRotation {
slog.Info("Starting log rotation", "reason", reason)
if err := s.rotateDatabase(); err != nil {
return fmt.Errorf("error rotating database: %w", err)
}
slog.Info("Log rotation completed successfully")
}
return nil
}
func (s *SQLiteStorage) needsRotation() (bool, string, error) {
if s.rotationCfg.MaxSizeBytes > 0 {
fileInfo, err := os.Stat(s.dbPath)
if err != nil {
return false, "", err
}
if fileInfo.Size() >= s.rotationCfg.MaxSizeBytes {
return true, fmt.Sprintf("file size %d >= max size %d", fileInfo.Size(), s.rotationCfg.MaxSizeBytes), nil
}
}
if s.rotationCfg.MaxAgeHours > 0 {
fileInfo, err := os.Stat(s.dbPath)
if err != nil {
return false, "", err
}
age := time.Since(fileInfo.ModTime())
if age >= s.rotationCfg.MaxAgeHours {
return true, fmt.Sprintf("file age %v >= max age %v", age, s.rotationCfg.MaxAgeHours), nil
}
}
return false, "", nil
}
func (s *SQLiteStorage) rotateDatabase() error {
if err := s.db.Close(); err != nil {
return fmt.Errorf("error closing database: %w", err)
}
archivePath := s.generateArchivePath()
if err := os.Rename(s.dbPath, archivePath); err != nil {
return fmt.Errorf("error moving database to archive: %w", err)
}
db, err := sql.Open("sqlite", s.dbPath)
if err != nil {
return fmt.Errorf("error opening new database: %w", err)
}
if _, err := db.Exec("PRAGMA journal_mode=WAL"); err != nil {
return fmt.Errorf("failed to enable WAL mode on new database: %w", err)
}
if err := createTables(db); err != nil {
return fmt.Errorf("failed to create tables in new database: %w", err)
}
s.db = db
if err := s.cleanupOldArchives(); err != nil {
slog.Warn("Error cleaning up old archives", "error", err)
}
return nil
}
func (s *SQLiteStorage) generateArchivePath() string {
dir := filepath.Dir(s.dbPath)
if s.rotationCfg.ArchiveDir != "" {
dir = s.rotationCfg.ArchiveDir
os.MkdirAll(dir, 0755)
}
base := filepath.Base(s.dbPath)
ext := filepath.Ext(base)
name := strings.TrimSuffix(base, ext)
timestamp := time.Now().Format("2006-01-02_15-04-05")
archiveName := fmt.Sprintf("%s.%s%s", name, timestamp, ext)
return filepath.Join(dir, archiveName)
}
func (s *SQLiteStorage) cleanupOldArchives() error {
if s.rotationCfg.MaxFiles <= 0 {
return nil
}
dir := filepath.Dir(s.dbPath)
if s.rotationCfg.ArchiveDir != "" {
dir = s.rotationCfg.ArchiveDir
}
base := filepath.Base(s.dbPath)
ext := filepath.Ext(base)
name := strings.TrimSuffix(base, ext)
pattern := fmt.Sprintf("%s.*%s", name, ext)
files, err := filepath.Glob(filepath.Join(dir, pattern))
if err != nil {
return err
}
var archives []string
for _, file := range files {
if file != s.dbPath {
archives = append(archives, file)
}
}
sort.Slice(archives, func(i, j int) bool {
infoI, _ := os.Stat(archives[i])
infoJ, _ := os.Stat(archives[j])
return infoI.ModTime().After(infoJ.ModTime())
})
if len(archives) > s.rotationCfg.MaxFiles {
for _, file := range archives[s.rotationCfg.MaxFiles:] {
if err := os.Remove(file); err != nil {
slog.Warn("Error removing old archive", "file", file, "error", err)
} else {
slog.Info("Removed old archive", "file", file)
}
}
}
return nil
}
func (s *SQLiteStorage) ForceRotate() error {
s.mu.Lock()
defer s.mu.Unlock()
slog.Info("Forcing log rotation")
return s.rotateDatabase()
}
func (s *SQLiteStorage) GetRotationInfo() (map[string]any, error) {
s.mu.RLock()
defer s.mu.RUnlock()
fileInfo, err := os.Stat(s.dbPath)
if err != nil { if err != nil {
return nil, err return nil, err
} }
info := map[string]any{
"currentSize": fileInfo.Size(),
"maxSize": s.rotationCfg.MaxSizeBytes,
"currentAge": time.Since(fileInfo.ModTime()).String(),
"maxAge": s.rotationCfg.MaxAgeHours.String(),
"maxFiles": s.rotationCfg.MaxFiles,
"checkInterval": s.rotationCfg.CheckIntervalMinutes.String(),
"archiveDir": s.rotationCfg.ArchiveDir,
}
return info, nil
}
func createTables(db *sql.DB) error {
createTableStmt := ` createTableStmt := `
CREATE TABLE IF NOT EXISTS log_entries ( CREATE TABLE IF NOT EXISTS log_entries (
id INTEGER PRIMARY KEY AUTOINCREMENT, id INTEGER PRIMARY KEY AUTOINCREMENT,
service TEXT, service TEXT,
timestamp DATETIME, timestamp DATETIME NOT NULL,
type TEXT, type TEXT NOT NULL,
host TEXT, host TEXT NOT NULL,
tool TEXT, tool TEXT,
log_level TEXT, log_level TEXT,
log_message TEXT, log_message TEXT,
@ -39,143 +288,317 @@ func NewStorageService(dbPath string) (*StorageService, error) {
fields TEXT, fields TEXT,
service_information TEXT, service_information TEXT,
system_metrics TEXT, system_metrics TEXT,
tool_information TEXT tool_information TEXT,
); created_at DATETIME DEFAULT CURRENT_TIMESTAMP,
` exported_at DATETIME
_, err = db.ExecContext(context.Background(), createTableStmt) );`
if err != nil {
return nil, err if _, err := db.Exec(createTableStmt); err != nil {
return err
} }
return &StorageService{db: db}, nil indexes := []string{
"CREATE INDEX IF NOT EXISTS idx_timestamp ON log_entries(timestamp);",
"CREATE INDEX IF NOT EXISTS idx_service ON log_entries(service);",
"CREATE INDEX IF NOT EXISTS idx_type ON log_entries(type);",
"CREATE INDEX IF NOT EXISTS idx_tool ON log_entries(tool);",
"CREATE INDEX IF NOT EXISTS idx_log_level ON log_entries(log_level);",
"CREATE INDEX IF NOT EXISTS idx_exported ON log_entries(exported_at);",
"CREATE INDEX IF NOT EXISTS idx_composite ON log_entries(timestamp, type, service);",
}
for _, index := range indexes {
if _, err := db.Exec(index); err != nil {
return fmt.Errorf("failed to create index: %w", err)
}
}
return nil
} }
func (s *StorageService) Close() error { func (s *SQLiteStorage) Store(ctx context.Context, entry *models.LogMessage) error {
return s.StoreBatch(ctx, []models.LogMessage{*entry})
}
func (s *SQLiteStorage) StoreBatch(ctx context.Context, entries []models.LogMessage) error {
if len(entries) == 0 {
return nil
}
s.mu.RLock()
defer s.mu.RUnlock()
tx, err := s.db.BeginTx(ctx, nil)
if err != nil {
return fmt.Errorf("failed to begin transaction: %w", err)
}
defer tx.Rollback()
stmt, err := tx.PrepareContext(ctx, `
INSERT INTO log_entries
(service, timestamp, type, host, tool, log_level, log_message, raw, priority, priority_name,
unit, pid, boot_id, machine_id, fields, service_information, system_metrics, tool_information)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)`)
if err != nil {
return fmt.Errorf("failed to prepare statement: %w", err)
}
defer stmt.Close()
for _, entry := range entries {
fieldsJSON, _ := json.Marshal(entry.Fields)
serviceInfoJSON, _ := json.Marshal(entry.ServiceInformation)
systemMetricsJSON, _ := json.Marshal(entry.SystemMetrics)
toolInfoJSON, _ := json.Marshal(entry.ToolInformation)
_, err := stmt.ExecContext(ctx,
entry.Service,
entry.Timestamp,
entry.Type,
entry.Host,
entry.Tool,
entry.LogLevel,
entry.LogMessage,
entry.Raw,
entry.Priority,
entry.PriorityName,
entry.Unit,
entry.PID,
entry.BootID,
entry.MachineID,
string(fieldsJSON),
string(serviceInfoJSON),
string(systemMetricsJSON),
string(toolInfoJSON),
)
if err != nil {
return fmt.Errorf("failed to insert entry: %w", err)
}
}
return tx.Commit()
}
func (s *SQLiteStorage) Query(ctx context.Context, query StorageQuery) ([]models.LogMessage, error) {
s.mu.RLock()
defer s.mu.RUnlock()
sqlQuery := "SELECT service, timestamp, type, host, tool, log_level, log_message, raw, priority, priority_name, unit, pid, boot_id, machine_id, fields, service_information, system_metrics, tool_information FROM log_entries WHERE 1=1"
args := []any{}
argCount := 0
if !query.StartTime.IsZero() {
argCount++
sqlQuery += fmt.Sprintf(" AND timestamp >= ?%d", argCount)
args = append(args, query.StartTime)
}
if !query.EndTime.IsZero() {
argCount++
sqlQuery += fmt.Sprintf(" AND timestamp <= ?%d", argCount)
args = append(args, query.EndTime)
}
if query.Service != "" {
argCount++
sqlQuery += fmt.Sprintf(" AND service = ?%d", argCount)
args = append(args, query.Service)
}
if query.Tool != "" {
argCount++
sqlQuery += fmt.Sprintf(" AND tool = ?%d", argCount)
args = append(args, query.Tool)
}
if query.LogLevel != "" {
argCount++
sqlQuery += fmt.Sprintf(" AND log_level = ?%d", argCount)
args = append(args, query.LogLevel)
}
if query.Type != "" {
argCount++
sqlQuery += fmt.Sprintf(" AND type = ?%d", argCount)
args = append(args, query.Type)
}
if query.OrderBy != "" {
direction := "ASC"
if query.OrderDesc {
direction = "DESC"
}
sqlQuery += fmt.Sprintf(" ORDER BY %s %s", query.OrderBy, direction)
} else {
sqlQuery += " ORDER BY timestamp DESC"
}
if query.Limit > 0 {
sqlQuery += fmt.Sprintf(" LIMIT %d", query.Limit)
if query.Offset > 0 {
sqlQuery += fmt.Sprintf(" OFFSET %d", query.Offset)
}
}
rows, err := s.db.QueryContext(ctx, sqlQuery, args...)
if err != nil {
return nil, fmt.Errorf("failed to execute query: %w", err)
}
defer rows.Close()
var entries []models.LogMessage
for rows.Next() {
var entry models.LogMessage
var fieldsJSON, serviceInfoJSON, systemMetricsJSON, toolInfoJSON string
err := rows.Scan(
&entry.Service,
&entry.Timestamp,
&entry.Type,
&entry.Host,
&entry.Tool,
&entry.LogLevel,
&entry.LogMessage,
&entry.Raw,
&entry.Priority,
&entry.PriorityName,
&entry.Unit,
&entry.PID,
&entry.BootID,
&entry.MachineID,
&fieldsJSON,
&serviceInfoJSON,
&systemMetricsJSON,
&toolInfoJSON,
)
if err != nil {
return nil, fmt.Errorf("failed to scan row: %w", err)
}
if fieldsJSON != "" && fieldsJSON != "null" {
json.Unmarshal([]byte(fieldsJSON), &entry.Fields)
}
if serviceInfoJSON != "" && serviceInfoJSON != "null" {
json.Unmarshal([]byte(serviceInfoJSON), &entry.ServiceInformation)
}
if systemMetricsJSON != "" && systemMetricsJSON != "null" {
json.Unmarshal([]byte(systemMetricsJSON), &entry.SystemMetrics)
}
if toolInfoJSON != "" && toolInfoJSON != "null" {
json.Unmarshal([]byte(toolInfoJSON), &entry.ToolInformation)
}
entries = append(entries, entry)
}
return entries, rows.Err()
}
func (s *SQLiteStorage) MarkAsExported(ctx context.Context, ids []int64) error {
if len(ids) == 0 {
return nil
}
s.mu.RLock()
defer s.mu.RUnlock()
tx, err := s.db.BeginTx(ctx, nil)
if err != nil {
return err
}
placeholders := strings.Repeat("?,", len(ids))
placeholders = placeholders[:len(placeholders)-1]
sqlQuery := fmt.Sprintf("UPDATE log_entries SET exported_at = CURRENT_TIMESTAMP WHERE id IN (%s)", placeholders)
args := make([]any, len(ids))
for i, id := range ids {
args[i] = id
}
_, err = tx.ExecContext(ctx, sqlQuery, args...)
if err != nil {
tx.Rollback()
return err
}
return tx.Commit()
}
func (s *SQLiteStorage) GetUnexportedEntries(ctx context.Context, limit int) ([]models.LogMessage, error) {
s.mu.RLock()
defer s.mu.RUnlock()
sqlQuery := `SELECT id, service, timestamp, type, host, tool, log_level, log_message, raw, priority, priority_name,
unit, pid, boot_id, machine_id, fields, service_information, system_metrics, tool_information
FROM log_entries WHERE exported_at IS NULL ORDER BY timestamp ASC`
if limit > 0 {
sqlQuery += fmt.Sprintf(" LIMIT %d", limit)
}
rows, err := s.db.QueryContext(ctx, sqlQuery)
if err != nil {
return nil, fmt.Errorf("failed to execute query: %w", err)
}
defer rows.Close()
var entries []models.LogMessage
for rows.Next() {
var entry models.LogMessage
var id int64
var fieldsJSON, serviceInfoJSON, systemMetricsJSON, toolInfoJSON string
err := rows.Scan(
&id,
&entry.Service,
&entry.Timestamp,
&entry.Type,
&entry.Host,
&entry.Tool,
&entry.LogLevel,
&entry.LogMessage,
&entry.Raw,
&entry.Priority,
&entry.PriorityName,
&entry.Unit,
&entry.PID,
&entry.BootID,
&entry.MachineID,
&fieldsJSON,
&serviceInfoJSON,
&systemMetricsJSON,
&toolInfoJSON,
)
if err != nil {
return nil, fmt.Errorf("failed to scan row: %w", err)
}
if entry.Fields == nil {
entry.Fields = make(map[string]any)
}
entry.Fields["_internal_id"] = id
if fieldsJSON != "" && fieldsJSON != "null" {
json.Unmarshal([]byte(fieldsJSON), &entry.Fields)
}
if serviceInfoJSON != "" && serviceInfoJSON != "null" {
json.Unmarshal([]byte(serviceInfoJSON), &entry.ServiceInformation)
}
if systemMetricsJSON != "" && systemMetricsJSON != "null" {
json.Unmarshal([]byte(systemMetricsJSON), &entry.SystemMetrics)
}
if toolInfoJSON != "" && toolInfoJSON != "null" {
json.Unmarshal([]byte(toolInfoJSON), &entry.ToolInformation)
}
entries = append(entries, entry)
}
return entries, rows.Err()
}
func (s *SQLiteStorage) Close() error {
close(s.rotationStop)
s.rotationWg.Wait()
s.mu.Lock()
defer s.mu.Unlock()
return s.db.Close() return s.db.Close()
} }
func (s *StorageService) SaveLogEntry(ctx context.Context, entry *models.LogMessage) error {
fieldsJSON := ""
if entry.Fields != nil {
b, err := json.Marshal(entry.Fields)
if err != nil {
return err
}
fieldsJSON = string(b)
}
serviceInfoJSON := ""
if entry.ServiceInformation != nil {
b, err := json.Marshal(entry.ServiceInformation)
if err != nil {
return err
}
serviceInfoJSON = string(b)
}
systemMetricsJSON := ""
if entry.SystemMetrics != nil {
b, err := json.Marshal(entry.SystemMetrics)
if err != nil {
return err
}
systemMetricsJSON = string(b)
}
toolInfoJSON := ""
if entry.ToolInformation != nil {
b, err := json.Marshal(entry.ToolInformation)
if err != nil {
return err
}
toolInfoJSON = string(b)
}
stmt := `
INSERT INTO log_entries
(service, timestamp, type, host, tool, log_level, log_message, raw, priority, priority_name, unit, pid, boot_id, machine_id, fields, service_information, system_metrics, tool_information)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
`
_, err := s.db.ExecContext(ctx, stmt,
entry.Service,
entry.Timestamp,
entry.Type,
entry.Host,
entry.Tool,
entry.LogLevel,
entry.LogMessage,
entry.Raw,
entry.Priority,
entry.PriorityName,
entry.Unit,
entry.PID,
entry.BootID,
entry.MachineID,
fieldsJSON,
serviceInfoJSON,
systemMetricsJSON,
toolInfoJSON,
)
return err
}
func (s *StorageService) LoadLogEntry(ctx context.Context, id int64) (*models.LogMessage, error) {
row := s.db.QueryRowContext(ctx, "SELECT service, timestamp, type, host, tool, log_level, log_message, raw, priority, priority_name, unit, pid, boot_id, machine_id, fields, service_information, system_metrics, tool_information FROM log_entries WHERE id = ?", id)
var entry models.LogMessage
var fieldsJSON, serviceInfoJSON, systemMetricsJSON, toolInfoJSON string
err := row.Scan(
&entry.Service,
&entry.Timestamp,
&entry.Type,
&entry.Host,
&entry.Tool,
&entry.LogLevel,
&entry.LogMessage,
&entry.Raw,
&entry.Priority,
&entry.PriorityName,
&entry.Unit,
&entry.PID,
&entry.BootID,
&entry.MachineID,
&fieldsJSON,
&serviceInfoJSON,
&systemMetricsJSON,
&toolInfoJSON,
)
if err != nil {
return nil, err
}
if fieldsJSON != "" {
var fields map[string]any
if err = json.Unmarshal([]byte(fieldsJSON), &fields); err == nil {
entry.Fields = fields
}
}
if serviceInfoJSON != "" {
var si any
if err = json.Unmarshal([]byte(serviceInfoJSON), &si); err == nil {
entry.ServiceInformation = si
}
}
if systemMetricsJSON != "" {
var sm any
if err = json.Unmarshal([]byte(systemMetricsJSON), &sm); err == nil {
entry.SystemMetrics = sm
}
}
if toolInfoJSON != "" {
var ti any
if err = json.Unmarshal([]byte(toolInfoJSON), &ti); err == nil {
entry.ToolInformation = ti
}
}
return &entry, nil
}

View file

@ -1,602 +0,0 @@
package main
import (
"context"
"database/sql"
"encoding/json"
"fmt"
"log/slog"
"os"
"path/filepath"
"sort"
"strings"
"sync"
"time"
"tixel_watch/models"
_ "modernc.org/sqlite"
)
type SQLiteStorage struct {
db *sql.DB
dbPath string
rotationCfg StorageRotationConfig
rotationStop chan struct{}
rotationWg sync.WaitGroup
mu sync.RWMutex
}
func DefaultRotationConfig() StorageRotationConfig {
return StorageRotationConfig{
MaxSizeBytes: 100 * 1024 * 1024, // 100MB
MaxAgeHours: 48 * time.Hour, // 48 hours
MaxFiles: 3, // 3 old Files
CheckIntervalMinutes: 5 * time.Minute, // check every 5 minutes
ArchiveDir: "", // same directory
}
}
func NewSQLiteStorage(dbPath string) (*SQLiteStorage, error) {
return NewSQLiteStorageWithRotation(dbPath, StorageRotationConfig{})
}
func NewSQLiteStorageWithRotation(dbPath string, rotationCfg StorageRotationConfig) (*SQLiteStorage, error) {
if rotationCfg.CheckIntervalMinutes == 0 {
rotationCfg = DefaultRotationConfig()
}
db, err := sql.Open("sqlite", dbPath)
if err != nil {
return nil, fmt.Errorf("failed to open SQLite database: %w", err)
}
if _, err := db.Exec("PRAGMA journal_mode=WAL"); err != nil {
return nil, fmt.Errorf("failed to enable WAL mode: %w", err)
}
if err := createTables(db); err != nil {
return nil, fmt.Errorf("failed to create tables: %w", err)
}
storage := &SQLiteStorage{
db: db,
dbPath: dbPath,
rotationCfg: rotationCfg,
rotationStop: make(chan struct{}),
}
if rotationCfg.MaxSizeBytes > 0 || rotationCfg.MaxAgeHours > 0 {
storage.rotationWg.Add(1)
go storage.rotationWorker()
slog.Info("Log rotation enabled",
"maxSize", rotationCfg.MaxSizeBytes,
"maxAge", rotationCfg.MaxAgeHours,
"maxFiles", rotationCfg.MaxFiles)
}
return storage, nil
}
func (s *SQLiteStorage) rotationWorker() {
defer s.rotationWg.Done()
ticker := time.NewTicker(s.rotationCfg.CheckIntervalMinutes)
defer ticker.Stop()
for {
select {
case <-s.rotationStop:
return
case <-ticker.C:
if err := s.checkAndRotate(); err != nil {
slog.Error("Error during log rotation check", "error", err)
}
}
}
}
func (s *SQLiteStorage) checkAndRotate() error {
s.mu.Lock()
defer s.mu.Unlock()
needsRotation, reason, err := s.needsRotation()
if err != nil {
return fmt.Errorf("error checking rotation needs: %w", err)
}
if needsRotation {
slog.Info("Starting log rotation", "reason", reason)
if err := s.rotateDatabase(); err != nil {
return fmt.Errorf("error rotating database: %w", err)
}
slog.Info("Log rotation completed successfully")
}
return nil
}
func (s *SQLiteStorage) needsRotation() (bool, string, error) {
if s.rotationCfg.MaxSizeBytes > 0 {
fileInfo, err := os.Stat(s.dbPath)
if err != nil {
return false, "", err
}
if fileInfo.Size() >= s.rotationCfg.MaxSizeBytes {
return true, fmt.Sprintf("file size %d >= max size %d", fileInfo.Size(), s.rotationCfg.MaxSizeBytes), nil
}
}
if s.rotationCfg.MaxAgeHours > 0 {
fileInfo, err := os.Stat(s.dbPath)
if err != nil {
return false, "", err
}
age := time.Since(fileInfo.ModTime())
if age >= s.rotationCfg.MaxAgeHours {
return true, fmt.Sprintf("file age %v >= max age %v", age, s.rotationCfg.MaxAgeHours), nil
}
}
return false, "", nil
}
func (s *SQLiteStorage) rotateDatabase() error {
if err := s.db.Close(); err != nil {
return fmt.Errorf("error closing database: %w", err)
}
archivePath := s.generateArchivePath()
if err := os.Rename(s.dbPath, archivePath); err != nil {
return fmt.Errorf("error moving database to archive: %w", err)
}
db, err := sql.Open("sqlite", s.dbPath)
if err != nil {
return fmt.Errorf("error opening new database: %w", err)
}
if _, err := db.Exec("PRAGMA journal_mode=WAL"); err != nil {
return fmt.Errorf("failed to enable WAL mode on new database: %w", err)
}
if err := createTables(db); err != nil {
return fmt.Errorf("failed to create tables in new database: %w", err)
}
s.db = db
if err := s.cleanupOldArchives(); err != nil {
slog.Warn("Error cleaning up old archives", "error", err)
}
return nil
}
func (s *SQLiteStorage) generateArchivePath() string {
dir := filepath.Dir(s.dbPath)
if s.rotationCfg.ArchiveDir != "" {
dir = s.rotationCfg.ArchiveDir
os.MkdirAll(dir, 0755)
}
base := filepath.Base(s.dbPath)
ext := filepath.Ext(base)
name := strings.TrimSuffix(base, ext)
timestamp := time.Now().Format("2006-01-02_15-04-05")
archiveName := fmt.Sprintf("%s.%s%s", name, timestamp, ext)
return filepath.Join(dir, archiveName)
}
func (s *SQLiteStorage) cleanupOldArchives() error {
if s.rotationCfg.MaxFiles <= 0 {
return nil
}
dir := filepath.Dir(s.dbPath)
if s.rotationCfg.ArchiveDir != "" {
dir = s.rotationCfg.ArchiveDir
}
base := filepath.Base(s.dbPath)
ext := filepath.Ext(base)
name := strings.TrimSuffix(base, ext)
pattern := fmt.Sprintf("%s.*%s", name, ext)
files, err := filepath.Glob(filepath.Join(dir, pattern))
if err != nil {
return err
}
var archives []string
for _, file := range files {
if file != s.dbPath {
archives = append(archives, file)
}
}
sort.Slice(archives, func(i, j int) bool {
infoI, _ := os.Stat(archives[i])
infoJ, _ := os.Stat(archives[j])
return infoI.ModTime().After(infoJ.ModTime())
})
if len(archives) > s.rotationCfg.MaxFiles {
for _, file := range archives[s.rotationCfg.MaxFiles:] {
if err := os.Remove(file); err != nil {
slog.Warn("Error removing old archive", "file", file, "error", err)
} else {
slog.Info("Removed old archive", "file", file)
}
}
}
return nil
}
func (s *SQLiteStorage) ForceRotate() error {
s.mu.Lock()
defer s.mu.Unlock()
slog.Info("Forcing log rotation")
return s.rotateDatabase()
}
func (s *SQLiteStorage) GetRotationInfo() (map[string]any, error) {
s.mu.RLock()
defer s.mu.RUnlock()
fileInfo, err := os.Stat(s.dbPath)
if err != nil {
return nil, err
}
info := map[string]any{
"currentSize": fileInfo.Size(),
"maxSize": s.rotationCfg.MaxSizeBytes,
"currentAge": time.Since(fileInfo.ModTime()).String(),
"maxAge": s.rotationCfg.MaxAgeHours.String(),
"maxFiles": s.rotationCfg.MaxFiles,
"checkInterval": s.rotationCfg.CheckIntervalMinutes.String(),
"archiveDir": s.rotationCfg.ArchiveDir,
}
return info, nil
}
func createTables(db *sql.DB) error {
createTableStmt := `
CREATE TABLE IF NOT EXISTS log_entries (
id INTEGER PRIMARY KEY AUTOINCREMENT,
service TEXT,
timestamp DATETIME NOT NULL,
type TEXT NOT NULL,
host TEXT NOT NULL,
tool TEXT,
log_level TEXT,
log_message TEXT,
raw TEXT,
priority TEXT,
priority_name TEXT,
unit TEXT,
pid INTEGER,
boot_id TEXT,
machine_id TEXT,
fields TEXT,
service_information TEXT,
system_metrics TEXT,
tool_information TEXT,
created_at DATETIME DEFAULT CURRENT_TIMESTAMP,
exported_at DATETIME
);`
if _, err := db.Exec(createTableStmt); err != nil {
return err
}
indexes := []string{
"CREATE INDEX IF NOT EXISTS idx_timestamp ON log_entries(timestamp);",
"CREATE INDEX IF NOT EXISTS idx_service ON log_entries(service);",
"CREATE INDEX IF NOT EXISTS idx_type ON log_entries(type);",
"CREATE INDEX IF NOT EXISTS idx_tool ON log_entries(tool);",
"CREATE INDEX IF NOT EXISTS idx_log_level ON log_entries(log_level);",
"CREATE INDEX IF NOT EXISTS idx_exported ON log_entries(exported_at);",
"CREATE INDEX IF NOT EXISTS idx_composite ON log_entries(timestamp, type, service);",
}
for _, index := range indexes {
if _, err := db.Exec(index); err != nil {
return fmt.Errorf("failed to create index: %w", err)
}
}
return nil
}
func (s *SQLiteStorage) Store(ctx context.Context, entry *models.LogMessage) error {
return s.StoreBatch(ctx, []models.LogMessage{*entry})
}
func (s *SQLiteStorage) StoreBatch(ctx context.Context, entries []models.LogMessage) error {
if len(entries) == 0 {
return nil
}
s.mu.RLock()
defer s.mu.RUnlock()
tx, err := s.db.BeginTx(ctx, nil)
if err != nil {
return fmt.Errorf("failed to begin transaction: %w", err)
}
defer tx.Rollback()
stmt, err := tx.PrepareContext(ctx, `
INSERT INTO log_entries
(service, timestamp, type, host, tool, log_level, log_message, raw, priority, priority_name,
unit, pid, boot_id, machine_id, fields, service_information, system_metrics, tool_information)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)`)
if err != nil {
return fmt.Errorf("failed to prepare statement: %w", err)
}
defer stmt.Close()
for _, entry := range entries {
fieldsJSON, _ := json.Marshal(entry.Fields)
serviceInfoJSON, _ := json.Marshal(entry.ServiceInformation)
systemMetricsJSON, _ := json.Marshal(entry.SystemMetrics)
toolInfoJSON, _ := json.Marshal(entry.ToolInformation)
_, err := stmt.ExecContext(ctx,
entry.Service,
entry.Timestamp,
entry.Type,
entry.Host,
entry.Tool,
entry.LogLevel,
entry.LogMessage,
entry.Raw,
entry.Priority,
entry.PriorityName,
entry.Unit,
entry.PID,
entry.BootID,
entry.MachineID,
string(fieldsJSON),
string(serviceInfoJSON),
string(systemMetricsJSON),
string(toolInfoJSON),
)
if err != nil {
return fmt.Errorf("failed to insert entry: %w", err)
}
}
return tx.Commit()
}
func (s *SQLiteStorage) Query(ctx context.Context, query StorageQuery) ([]models.LogMessage, error) {
s.mu.RLock()
defer s.mu.RUnlock()
sqlQuery := "SELECT service, timestamp, type, host, tool, log_level, log_message, raw, priority, priority_name, unit, pid, boot_id, machine_id, fields, service_information, system_metrics, tool_information FROM log_entries WHERE 1=1"
args := []any{}
argCount := 0
if !query.StartTime.IsZero() {
argCount++
sqlQuery += fmt.Sprintf(" AND timestamp >= ?%d", argCount)
args = append(args, query.StartTime)
}
if !query.EndTime.IsZero() {
argCount++
sqlQuery += fmt.Sprintf(" AND timestamp <= ?%d", argCount)
args = append(args, query.EndTime)
}
if query.Service != "" {
argCount++
sqlQuery += fmt.Sprintf(" AND service = ?%d", argCount)
args = append(args, query.Service)
}
if query.Tool != "" {
argCount++
sqlQuery += fmt.Sprintf(" AND tool = ?%d", argCount)
args = append(args, query.Tool)
}
if query.LogLevel != "" {
argCount++
sqlQuery += fmt.Sprintf(" AND log_level = ?%d", argCount)
args = append(args, query.LogLevel)
}
if query.Type != "" {
argCount++
sqlQuery += fmt.Sprintf(" AND type = ?%d", argCount)
args = append(args, query.Type)
}
if query.OrderBy != "" {
direction := "ASC"
if query.OrderDesc {
direction = "DESC"
}
sqlQuery += fmt.Sprintf(" ORDER BY %s %s", query.OrderBy, direction)
} else {
sqlQuery += " ORDER BY timestamp DESC"
}
if query.Limit > 0 {
sqlQuery += fmt.Sprintf(" LIMIT %d", query.Limit)
if query.Offset > 0 {
sqlQuery += fmt.Sprintf(" OFFSET %d", query.Offset)
}
}
rows, err := s.db.QueryContext(ctx, sqlQuery, args...)
if err != nil {
return nil, fmt.Errorf("failed to execute query: %w", err)
}
defer rows.Close()
var entries []models.LogMessage
for rows.Next() {
var entry models.LogMessage
var fieldsJSON, serviceInfoJSON, systemMetricsJSON, toolInfoJSON string
err := rows.Scan(
&entry.Service,
&entry.Timestamp,
&entry.Type,
&entry.Host,
&entry.Tool,
&entry.LogLevel,
&entry.LogMessage,
&entry.Raw,
&entry.Priority,
&entry.PriorityName,
&entry.Unit,
&entry.PID,
&entry.BootID,
&entry.MachineID,
&fieldsJSON,
&serviceInfoJSON,
&systemMetricsJSON,
&toolInfoJSON,
)
if err != nil {
return nil, fmt.Errorf("failed to scan row: %w", err)
}
if fieldsJSON != "" && fieldsJSON != "null" {
json.Unmarshal([]byte(fieldsJSON), &entry.Fields)
}
if serviceInfoJSON != "" && serviceInfoJSON != "null" {
json.Unmarshal([]byte(serviceInfoJSON), &entry.ServiceInformation)
}
if systemMetricsJSON != "" && systemMetricsJSON != "null" {
json.Unmarshal([]byte(systemMetricsJSON), &entry.SystemMetrics)
}
if toolInfoJSON != "" && toolInfoJSON != "null" {
json.Unmarshal([]byte(toolInfoJSON), &entry.ToolInformation)
}
entries = append(entries, entry)
}
return entries, rows.Err()
}
func (s *SQLiteStorage) MarkAsExported(ctx context.Context, ids []int64) error {
if len(ids) == 0 {
return nil
}
s.mu.RLock()
defer s.mu.RUnlock()
tx, err := s.db.BeginTx(ctx, nil)
if err != nil {
return err
}
placeholders := strings.Repeat("?,", len(ids))
placeholders = placeholders[:len(placeholders)-1]
sqlQuery := fmt.Sprintf("UPDATE log_entries SET exported_at = CURRENT_TIMESTAMP WHERE id IN (%s)", placeholders)
args := make([]any, len(ids))
for i, id := range ids {
args[i] = id
}
_, err = tx.ExecContext(ctx, sqlQuery, args...)
if err != nil {
tx.Rollback()
return err
}
return tx.Commit()
}
func (s *SQLiteStorage) GetUnexportedEntries(ctx context.Context, limit int) ([]models.LogMessage, error) {
s.mu.RLock()
defer s.mu.RUnlock()
sqlQuery := `SELECT id, service, timestamp, type, host, tool, log_level, log_message, raw, priority, priority_name,
unit, pid, boot_id, machine_id, fields, service_information, system_metrics, tool_information
FROM log_entries WHERE exported_at IS NULL ORDER BY timestamp ASC`
if limit > 0 {
sqlQuery += fmt.Sprintf(" LIMIT %d", limit)
}
rows, err := s.db.QueryContext(ctx, sqlQuery)
if err != nil {
return nil, fmt.Errorf("failed to execute query: %w", err)
}
defer rows.Close()
var entries []models.LogMessage
for rows.Next() {
var entry models.LogMessage
var id int64
var fieldsJSON, serviceInfoJSON, systemMetricsJSON, toolInfoJSON string
err := rows.Scan(
&id,
&entry.Service,
&entry.Timestamp,
&entry.Type,
&entry.Host,
&entry.Tool,
&entry.LogLevel,
&entry.LogMessage,
&entry.Raw,
&entry.Priority,
&entry.PriorityName,
&entry.Unit,
&entry.PID,
&entry.BootID,
&entry.MachineID,
&fieldsJSON,
&serviceInfoJSON,
&systemMetricsJSON,
&toolInfoJSON,
)
if err != nil {
return nil, fmt.Errorf("failed to scan row: %w", err)
}
if entry.Fields == nil {
entry.Fields = make(map[string]any)
}
entry.Fields["_internal_id"] = id
if fieldsJSON != "" && fieldsJSON != "null" {
json.Unmarshal([]byte(fieldsJSON), &entry.Fields)
}
if serviceInfoJSON != "" && serviceInfoJSON != "null" {
json.Unmarshal([]byte(serviceInfoJSON), &entry.ServiceInformation)
}
if systemMetricsJSON != "" && systemMetricsJSON != "null" {
json.Unmarshal([]byte(systemMetricsJSON), &entry.SystemMetrics)
}
if toolInfoJSON != "" && toolInfoJSON != "null" {
json.Unmarshal([]byte(toolInfoJSON), &entry.ToolInformation)
}
entries = append(entries, entry)
}
return entries, rows.Err()
}
func (s *SQLiteStorage) Close() error {
close(s.rotationStop)
s.rotationWg.Wait()
s.mu.Lock()
defer s.mu.Unlock()
return s.db.Close()
}

View file

@ -4,7 +4,7 @@ import (
"context" "context"
"log/slog" "log/slog"
"time" "time"
"tixel_watch/models" "watch-tool/models"
) )
type LogProcessor struct { type LogProcessor struct {

110
main.go
View file

@ -8,26 +8,36 @@ import (
"sync" "sync"
"syscall" "syscall"
"time" "time"
"tixel_watch/models" "watch-tool/helpers"
"watch-tool/models"
"watch-tool/patterns"
) )
var hostname string var currentHostname string
func init() { func init() {
var err error var err error
hostname, err = os.Hostname() currentHostname, err = os.Hostname()
if err != nil { if err != nil {
hostname = "unknown" currentHostname = "unknown"
slog.Warn("Could not determine hostname, using fallback", "fallback", currentHostname)
} }
} }
func main() { func main() {
cfg, err := LoadConfigV2() cfg, err := LoadConfig()
if err != nil { if err != nil {
slog.Error("error loading configuration", "error", err) slog.Error("Startup failed: configuration error", "error", err)
os.Exit(1) os.Exit(1)
} }
slog.Info("TIXEL System Monitor started")
slog.Info("System Monitor started", "hostname", currentHostname)
if err := patterns.GetInstance().Load(cfg.PatternsFile); err != nil {
slog.Error("Startup failed: could not load patterns", "file", cfg.PatternsFile, "error", err)
os.Exit(1)
}
slog.Info("Regex patterns loaded successfully", "file", cfg.PatternsFile)
var storage StorageInterface var storage StorageInterface
if cfg.LocalStorage.Enable { if cfg.LocalStorage.Enable {
@ -46,7 +56,7 @@ func main() {
} }
storage = sqliteStorage storage = sqliteStorage
defer storage.Close() defer storage.Close()
slog.Info("SQLite storage with rotation initialized", "path", cfg.LocalStorage.DBPath) slog.Info("SQLite storage initialized", "path", cfg.LocalStorage.DBPath)
} else { } else {
slog.Error("Local storage is disabled, but it's required for the new architecture") slog.Error("Local storage is disabled, but it's required for the new architecture")
os.Exit(1) os.Exit(1)
@ -65,7 +75,7 @@ func main() {
exportManager = NewExportManager(storage, exportConfig) exportManager = NewExportManager(storage, exportConfig)
if cfg.Elasticsearch.Enabled { if cfg.Elasticsearch.Enabled {
esExporter, err := NewElasticsearchExporterV2(cfg.Elasticsearch) esExporter, err := NewElasticsearchExporter(cfg.Elasticsearch)
if err != nil { if err != nil {
slog.Error("failed to create Elasticsearch exporter", "error", err) slog.Error("failed to create Elasticsearch exporter", "error", err)
os.Exit(1) os.Exit(1)
@ -79,10 +89,6 @@ func main() {
exportManager.RegisterExporter("elasticsearch", esExporter) exportManager.RegisterExporter("elasticsearch", esExporter)
slog.Info("Elasticsearch exporter registered") slog.Info("Elasticsearch exporter registered")
} }
// Add more exporters here in the future
// exportManager.RegisterExporter("checkmk", checkmkExporter)
// exportManager.RegisterExporter("grafana", grafanaExporter)
} }
logChan := make(chan models.LogMessage, 1000) logChan := make(chan models.LogMessage, 1000)
@ -92,86 +98,92 @@ func main() {
var wg sync.WaitGroup var wg sync.WaitGroup
wg.Add(1) wg.Add(1)
go func() { helpers.SafeGo(ctx, "LogProcessor", func() {
defer wg.Done() defer wg.Done()
processor := NewLogProcessor(storage) processor := NewLogProcessor(storage)
processor.Start(ctx, logChan) processor.Start(ctx, logChan)
}() })
if exportManager != nil { if exportManager != nil {
wg.Add(1) wg.Add(1)
go func() { helpers.SafeGo(ctx, "ExportManager", func() {
defer wg.Done() defer wg.Done()
exportManager.Start(ctx) exportManager.Start(ctx)
}() })
} }
for _, service := range cfg.Services { for _, service := range cfg.Services {
if !service.Enabled { if !service.Enabled {
slog.Info("Service deactivated, skipping...", "service", service.Name) slog.Debug("Service deactivated, skipping...", "service", service.Name)
continue continue
} }
wg.Add(1) wg.Add(1)
go func(s ServiceConfig) { srv := service
defer wg.Done()
monitor := NewServiceMonitor(s)
if err := monitor.Start(ctx, logChan); err != nil {
slog.Error("error watching service", "service", s.Name, "error", err)
}
}(service)
slog.Info("started watching Service-Log", "service", service.Name) helpers.SafeGo(ctx, "ServiceMonitor-"+srv.Name, func() {
defer wg.Done()
monitor := NewServiceMonitor(srv, currentHostname)
if err := monitor.Start(ctx, logChan); err != nil {
slog.Error("Error watching service", "service", srv.Name, "error", err)
}
})
slog.Info("Started watching Service-Log", "service", service.Name)
} }
for _, tool := range cfg.Tools { for _, tool := range cfg.Tools {
if !tool.Enabled { if !tool.Enabled {
slog.Info("Tool is deactivated, skipping...", "tool", tool.Name) slog.Debug("Tool is deactivated, skipping...", "tool", tool.Name)
continue continue
} }
wg.Add(1) wg.Add(1)
go func(t ToolConfig) { t := tool
defer wg.Done()
monitor := NewFileMonitor(t)
if err := monitor.Start(ctx, logChan); err != nil {
slog.Error("error watching", "tool", t.Name, "error", err)
}
}(tool)
slog.Info("started watching logs", "tool", tool.Name, "file", tool.LogFile) helpers.SafeGo(ctx, "FileMonitor-"+t.Name, func() {
defer wg.Done()
monitor := NewFileMonitor(t, currentHostname)
if err := monitor.Start(ctx, logChan); err != nil {
slog.Error("Error watching tool", "tool", t.Name, "error", err)
}
})
slog.Info("Started watching logs", "tool", tool.Name, "file", tool.LogFile)
} }
if cfg.SystemMetrics.Enabled { if cfg.SystemMetrics.Enabled {
wg.Add(1) wg.Add(1)
go func() { helpers.SafeGo(ctx, "SystemMetrics", func() {
defer wg.Done() defer wg.Done()
collector := NewSystemMetricsCollector(cfg.SystemMetrics, cfg.PollIntervalSeconds) collector := NewSystemMetricsCollector(cfg.SystemMetrics, cfg.PollIntervalSeconds, currentHostname)
collector.StartV2(ctx, storage, logChan) collector.Start(ctx, storage, logChan)
}() })
slog.Info("Started collecting System-Metrics") slog.Info("Started collecting System-Metrics")
} }
if cfg.WebService.Enabled { if cfg.WebService.Enabled {
wg.Add(1) wg.Add(1)
go func() { helpers.SafeGo(ctx, "WebService", func() {
defer wg.Done() defer wg.Done()
webService := NewWebServiceV2(cfg, storage) webService := NewWebService(cfg, storage)
if err := webService.Start(ctx); err != nil { if err := webService.Start(ctx); err != nil {
slog.Error("web service error", "error", err) slog.Error("Web service error", "error", err)
} }
}() })
slog.Info("Web service started", "host", cfg.WebService.Host, "port", cfg.WebService.Port) slog.Info("Web service started", "host", cfg.WebService.Host, "port", cfg.WebService.Port)
} }
sigCh := make(chan os.Signal, 1) sigCh := make(chan os.Signal, 1)
signal.Notify(sigCh, os.Interrupt, syscall.SIGTERM) signal.Notify(sigCh, os.Interrupt, syscall.SIGTERM)
<-sigCh s := <-sigCh
slog.Info("Shutdown-Signal received, stopping threads...") slog.Info("Shutdown signal received, stopping threads...", "signal", s)
cancel() cancel()
close(logChan)
done := make(chan struct{}) done := make(chan struct{})
go func() { go func() {
@ -181,9 +193,11 @@ func main() {
select { select {
case <-done: case <-done:
slog.Info("All threads closed") close(logChan)
slog.Info("All threads closed gracefully")
case <-time.After(10 * time.Second): case <-time.After(10 * time.Second):
slog.Info("Shutdown-Timeout reached, force quitting") slog.Error("Shutdown timeout reached, force quitting")
os.Exit(1)
} }
slog.Info("Program stopped") slog.Info("Program stopped")

View file

@ -1,49 +0,0 @@
package parser
import (
"log/slog"
"regexp"
"strings"
"time"
"tixel_watch/helpers"
"tixel_watch/models"
)
var (
amServicePattern = regexp.MustCompile(`^(\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}(?:\.\d+)?Z)\s+(\w+)\s+(\d+)\s+---\s+\[\s*([^\]]*)\]\s+([\w\.]+)\s*:\s*(.*)$`)
)
type AMParser struct{}
func (a *AMParser) Parse(line string) (models.LogMessage, error) {
newEntry := models.LogMessage{
Service: "access-manager",
}
syslogFields, logContent := helpers.ExtractSyslogHeader(line)
newEntry.Host = syslogFields.Hostname
matches := amServicePattern.FindStringSubmatch(strings.TrimSpace(logContent))
if len(matches) != 7 {
newEntry.Timestamp = time.Now()
newEntry.LogMessage = line
return newEntry, nil
}
timestampStr := strings.Join(strings.Split(matches[1], " "), "T")
timestamp, err := helpers.ParseRFC3339WithOptionalZ(timestampStr)
if err != nil {
slog.Error("unable to parse time", "error", err)
return newEntry, err
}
baseInfo := models.AMBaseInfo{
ProcessID: matches[3],
ThreadID: strings.TrimSpace(matches[4]),
LoggerName: matches[5],
}
newEntry.Timestamp = timestamp
newEntry.LogLevel = matches[2]
newEntry.LogMessage = matches[6]
newEntry.ServiceInformation = baseInfo
return newEntry, nil
}

View file

@ -1,28 +0,0 @@
package parser
import (
"strings"
"time"
"tixel_watch/models"
)
type DefaultParser struct {
Service string
Tool string
}
func (d *DefaultParser) Parse(line string) (models.LogMessage, error) {
msg := models.LogMessage{
LogLevel: "unknown",
LogMessage: strings.TrimSpace(line),
Raw: line,
Timestamp: time.Now(),
}
if d.Service != "" {
msg.Service = d.Service
}
if d.Tool != "" {
msg.Tool = d.Tool
}
return msg, nil
}

View file

@ -1,27 +1,10 @@
package parser package parser
func New(serviceName, logType string) (Parser, error) { func New(serviceName, logType, hostname string) (Parser, error) {
switch logType { switch logType {
case "custom":
switch serviceName {
case "tixstream":
return &TSParser{}, nil
case "transfer-job-manager":
return &TJMParser{}, nil
case "access-manager":
return &AMParser{}, nil
case "tixel-control-center":
return &TCCParser{}, nil
case "nginx":
return &NginxParser{}, nil
case "nginx-tjm":
return &NginxTJMLogParser{ToolName: serviceName}, nil
default:
return &DefaultParser{Service: serviceName}, nil
}
case "json": case "json":
return &JSONParser{}, nil return &JSONParser{}, nil
default: default:
return &DefaultParser{Service: serviceName}, nil return NewGenericParser(serviceName, hostname), nil
} }
} }

180
parser/generic_parser.go Normal file
View file

@ -0,0 +1,180 @@
package parser
import (
"fmt"
"log/slog"
"strconv"
"strings"
"time"
"watch-tool/models"
"watch-tool/patterns"
)
type GenericParser struct {
ServiceName string
Hostname string
Extractors []patterns.CompiledExtractor
CommonExt []patterns.CompiledExtractor
}
func NewGenericParser(serviceName, hostname string) *GenericParser {
repo := patterns.GetInstance()
var svcExt, commonExt []patterns.CompiledExtractor
if repo != nil {
svcExt = repo.GetExtractors(serviceName)
commonExt = repo.GetExtractors("common")
} else {
slog.Error("CRITICAL: Pattern Repository is nil. Parser will not work correctly.")
}
return &GenericParser{
ServiceName: serviceName,
Hostname: hostname,
Extractors: svcExt,
CommonExt: commonExt,
}
}
func (p *GenericParser) Parse(line string) (models.LogMessage, error) {
entry := models.LogMessage{
Service: p.ServiceName,
Host: p.Hostname,
Timestamp: time.Now(),
Raw: line,
Fields: make(map[string]any),
Type: "log_entry",
}
trimmedLine := strings.TrimSpace(line)
if trimmedLine == "" {
return entry, nil
}
allExtractors := append(p.CommonExt, p.Extractors...)
matchedAny := false
for _, ext := range allExtractors {
matches := ext.Pattern.FindStringSubmatch(trimmedLine)
if matches == nil {
continue
}
matchedAny = true
subexpNames := ext.Pattern.SubexpNames()
for i, matchValue := range matches {
if i == 0 {
continue
}
groupName := subexpNames[i]
if groupName == "" {
continue
}
cleanValue := strings.TrimSpace(matchValue)
targetType := ext.Fields[groupName]
parsedValue := p.safeConvert(cleanValue, targetType)
p.mapField(&entry, groupName, parsedValue)
}
}
if !matchedAny {
entry.LogMessage = trimmedLine
entry.Fields["_parse_status"] = "failed"
} else if entry.LogMessage == "" {
entry.LogMessage = trimmedLine
}
return entry, nil
}
func (p *GenericParser) safeConvert(value, typeDef string) any {
if value == "" || value == "-" {
if strings.HasPrefix(typeDef, "int") || strings.HasPrefix(typeDef, "float") {
return 0
}
return value
}
var err error
var result any
switch {
case strings.HasPrefix(typeDef, "int"):
var i int
i, err = strconv.Atoi(value)
result = i
case strings.HasPrefix(typeDef, "float"):
var f float64
f, err = strconv.ParseFloat(value, 64)
result = f
case strings.HasPrefix(typeDef, "time:"):
layout := strings.TrimPrefix(typeDef, "time:")
result, err = p.parseTimeRobust(value, layout)
case typeDef == "bool":
var b bool
b, err = strconv.ParseBool(value)
result = b
default:
return value
}
if err != nil {
return value
}
return result
}
func (p *GenericParser) parseTimeRobust(value, layout string) (time.Time, error) {
if layout == "Jan 02 15:04:05" {
t, err := time.Parse(layout, value)
if err != nil {
return time.Time{}, err
}
now := time.Now()
year := now.Year()
if t.Month() > now.Month() {
year--
}
return t.AddDate(year, 0, 0), nil
}
return time.Parse(layout, value)
}
func (p *GenericParser) mapField(entry *models.LogMessage, key string, value any) {
switch key {
case "timestamp", "time":
if t, ok := value.(time.Time); ok {
entry.Timestamp = t
}
case "log_level", "level":
entry.LogLevel = fmt.Sprintf("%v", value)
case "message", "msg":
entry.LogMessage = fmt.Sprintf("%v", value)
case "host", "hostname":
entry.Host = fmt.Sprintf("%v", value)
case "service":
entry.Service = fmt.Sprintf("%v", value)
case "pid":
if v, ok := value.(int); ok {
entry.PID = v
} else if vStr, ok := value.(string); ok {
if pid, err := strconv.Atoi(vStr); err == nil {
entry.PID = pid
}
}
default:
entry.Fields[key] = value
}
}

View file

@ -3,7 +3,7 @@ package parser
import ( import (
"encoding/json" "encoding/json"
"log/slog" "log/slog"
"tixel_watch/models" "watch-tool/models"
) )
type JSONParser struct{} type JSONParser struct{}

View file

@ -1,57 +0,0 @@
package parser
import (
"log/slog"
"regexp"
"strconv"
"strings"
"tixel_watch/models"
)
var (
nginxAccessPattern = regexp.MustCompile(`^(\S+)\s+\S+\s+(\S+)\s+\[([^\]]+)\]\s+"([^"]+)"\s+(\d+)\s+(\d+|-)\s*(?:"([^"]*)"\s+"([^"]*)")?`)
)
type NginxParser struct{}
func (n *NginxParser) Parse(line string) (models.LogMessage, error) {
newEntry := models.LogMessage{
Service: "nginx",
}
matches := nginxAccessPattern.FindStringSubmatch(strings.TrimSpace(line))
if len(matches) < 7 {
return newEntry, nil
}
statusCode, err := strconv.ParseInt(matches[5], 10, 64)
if err != nil {
slog.Error("cant parse statuscode", "error", err)
}
bytesSend, err := strconv.ParseInt(matches[6], 10, 64)
if err != nil {
slog.Error("cant parse bytessend", "error", err)
}
baseInfo := models.NGinXBaseInfo{
ClientIP: matches[1],
RemoteUser: matches[2],
Request: matches[4],
StatusCode: int(statusCode),
BytesSend: int(bytesSend),
}
if len(matches) > 7 && matches[7] != "" {
baseInfo.Referer = matches[7]
}
if len(matches) > 8 && matches[8] != "" {
baseInfo.UserAgent = matches[8]
}
if requestParts := strings.Fields(matches[4]); len(requestParts) >= 3 {
baseInfo.HTTPMethod = requestParts[0]
baseInfo.RequestURI = requestParts[1]
baseInfo.HTTPVersion = requestParts[2]
}
newEntry.ServiceInformation = baseInfo
return newEntry, nil
}

View file

@ -4,12 +4,13 @@ import (
"log/slog" "log/slog"
"strconv" "strconv"
"strings" "strings"
"tixel_watch/helpers" "watch-tool/helpers"
"tixel_watch/models" "watch-tool/models"
) )
type NginxTJMLogParser struct { type NginxTJMLogParser struct {
ToolName string ToolName string
Hostname string
} }
func (p *NginxTJMLogParser) Parse(line string) (models.LogMessage, error) { func (p *NginxTJMLogParser) Parse(line string) (models.LogMessage, error) {
@ -18,11 +19,7 @@ func (p *NginxTJMLogParser) Parse(line string) (models.LogMessage, error) {
Tool: p.ToolName, Tool: p.ToolName,
Raw: line, Raw: line,
} }
hostname, err := helpers.GetHostname() entry.Host = p.Hostname
if err != nil {
return entry, err
}
entry.Host = hostname
entry = p.parseNginxTJM(entry) entry = p.parseNginxTJM(entry)
return entry, nil return entry, nil
} }

View file

@ -1,11 +1,9 @@
package parser package parser
import ( import (
"tixel_watch/models" "watch-tool/models"
) )
type Parser interface { type Parser interface {
//TODO: Change parsers to return an error as well
Parse(line string) (models.LogMessage, error) Parse(line string) (models.LogMessage, error)
// Parse(line string) models.LogMessage
} }

View file

@ -1,64 +0,0 @@
package parser
import (
"log/slog"
"regexp"
"strings"
"tixel_watch/helpers"
"tixel_watch/models"
)
type RegexLogParser struct {
Pattern *regexp.Regexp
Fields map[string]string
Toolname string
}
func (p *RegexLogParser) Parse(line string) (models.LogMessage, error) {
entry := models.LogMessage{Type: "log_entry"}
entry.Tool = p.Toolname
entry.Raw = line
hostname, err := helpers.GetHostname()
if err != nil {
slog.Warn("cannot get hostname")
return entry, err
}
entry.Host = hostname
fields := p.parseWithPattern(line)
if fields != nil {
entry.Fields = fields
} else {
entry.LogMessage = strings.TrimSpace(line)
}
return entry, nil
}
func (p *RegexLogParser) parseWithPattern(text string) map[string]any {
matches := p.Pattern.FindStringSubmatch(text)
if matches == nil {
return nil
}
fields := make(map[string]any)
subexpNames := p.Pattern.SubexpNames()
for i, match := range matches {
if i == 0 {
continue
}
if i < len(subexpNames) && subexpNames[i] != "" {
fieldName := subexpNames[i]
if mappedName, exists := p.Fields[fieldName]; exists {
fieldName = mappedName
}
fields[fieldName] = match
}
}
return fields
}

View file

@ -1,49 +0,0 @@
package parser
import (
"log/slog"
"regexp"
"strings"
"time"
"tixel_watch/helpers"
"tixel_watch/models"
)
var (
tccServicePattern = regexp.MustCompile(`^(\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}(?:\.\d+)?Z)\s+(\w+)\s+(\d+)\s+---\s+\[\s*([^\]]*)\]\s+([\w\.]+)\s*:\s*(.*)$`)
)
type TCCParser struct{}
func (t *TCCParser) Parse(line string) (models.LogMessage, error) {
newEntry := models.LogMessage{
Service: "tixel-control-center",
}
syslogFields, logContent := helpers.ExtractSyslogHeader(line)
newEntry.Host = syslogFields.Hostname
matches := tccServicePattern.FindStringSubmatch(strings.TrimSpace(logContent))
if len(matches) != 7 {
newEntry.Timestamp = time.Now()
newEntry.LogMessage = line
return newEntry, nil
}
timestampStr := strings.Join(strings.Split(matches[1], " "), "T")
timestamp, err := helpers.ParseRFC3339WithOptionalZ(timestampStr)
if err != nil {
slog.Error("unable to parse time", "error", err)
return newEntry, err
}
baseInfo := models.TCCBaseInfo{
ProcessID: matches[3],
ThreadID: strings.TrimSpace(matches[4]),
LoggerName: matches[5],
}
newEntry.Timestamp = timestamp
newEntry.LogLevel = matches[2]
newEntry.LogMessage = matches[6]
newEntry.ServiceInformation = baseInfo
return newEntry, nil
}

View file

@ -1,91 +0,0 @@
package parser
import (
"log/slog"
"regexp"
"strings"
"tixel_watch/helpers"
"tixel_watch/models"
)
var (
tjmServicePattern = regexp.MustCompile(`^(?<time>\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}\.\d{3})\s+(?<level>\S+)\s+(?<pid>\d+).*?\[(?<collatation_id>[^\]]*)\]\s+\[(?<username>[^\]]*)\]\s+\[(?<thread>[^\]]*)\]\s+(?<class>.*?)\s+:\s+(?<message>.*)`)
tjmTransferNamePattern = regexp.MustCompile(`^(\d{8}T\d{6}-[A-Za-z0-9]+-.+?-(?:in|out)) ?: (.*)$`)
tjmTransferIDPattern1 = regexp.MustCompile(`(?P<transfer>\w{8}-\w{4}-\w{4}-\w{4}-\w{12}).*?(?P<message>.*)`)
tjmTransferIDPattern2 = regexp.MustCompile(`(?P<before>.*)(?P<transfer>\w{8}-\w{4}-\w{4}-\w{4}-\w{12}).*?(?P<message>.*)`)
)
type TJMParser struct{}
func (t *TJMParser) Parse(line string) (models.LogMessage, error) {
newEntry := models.LogMessage{
Service: "transfer-job-manager",
}
syslogFields, logContent := helpers.ExtractSyslogHeader(line)
newEntry.Host = syslogFields.Hostname
msg := strings.TrimSpace(logContent)
msg = strings.ReplaceAll(msg, " ", " ")
msg = strings.ReplaceAll(msg, "---", "")
msg = strings.ReplaceAll(msg, " ", " ")
parts := strings.Fields(msg)
if len(parts) < 4 {
newEntry.LogMessage = logContent
return newEntry, nil
}
matches := tjmServicePattern.FindStringSubmatch(logContent)
var baseInfo models.TJMTransferInfo
if len(matches) > 0 {
timestampStr := strings.Join(strings.Split(matches[1], " "), "T")
timestamp, err := helpers.ParseRFC3339WithOptionalZ(timestampStr)
if err != nil {
slog.Error("unable to parse time", "error", err)
}
newEntry.Timestamp = timestamp
newEntry.LogLevel = strings.TrimSpace(matches[2])
newEntry.LogMessage = strings.TrimSpace(matches[8])
baseInfo = models.TJMTransferInfo{
ProcessID: strings.TrimSpace(matches[3]),
CorrelationID: strings.TrimSpace(matches[4]),
Username: strings.TrimSpace(matches[5]),
ThreadID: strings.TrimSpace(matches[6]),
JavaClass: strings.TrimSpace(matches[7]),
}
} else {
newEntry.LogMessage = logContent
}
trNameMatch := tjmTransferNamePattern.FindStringSubmatch(newEntry.LogMessage)
var transferName string
var transferID string
if len(trNameMatch) > 0 {
transferName = trNameMatch[1]
newEntry.LogMessage = trNameMatch[2]
if strings.Contains(trNameMatch[1], "-in") {
baseInfo.Direction = "incoming"
}
if strings.Contains(trNameMatch[1], "-out") {
baseInfo.Direction = "outgoing"
}
}
trIDMatch := tjmTransferIDPattern1.FindStringSubmatch(newEntry.LogMessage)
if len(trIDMatch) > 0 {
transferID = trIDMatch[1]
}
trIDMatch = tjmTransferIDPattern2.FindStringSubmatch(newEntry.LogMessage)
if len(trIDMatch) > 0 {
transferID = trIDMatch[2]
}
if transferID != "" {
baseInfo.TransferID = transferID
} else if transferName != "" {
baseInfo.TransferID = transferName
} else {
baseInfo.TransferID = "no_transfer_id"
}
if baseInfo.StartTime.IsZero() {
baseInfo.StartTime = newEntry.Timestamp
}
newEntry.ServiceInformation = baseInfo
return newEntry, nil
}

View file

@ -1,136 +0,0 @@
package parser
import (
"log/slog"
"regexp"
"strconv"
"strings"
"tixel_watch/helpers"
"tixel_watch/models"
)
var (
tsServicePattern = regexp.MustCompile(`^(?<level>\S+)\s+(?<time>\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}\.\d{6})\s+(?<message>.*)`)
tsTransferIDPattern = regexp.MustCompile(`^(?<transfer>\w{8}-\w{4}-\w{4}-\w{4}-\w{12})\s+(?<message>.*)`)
tsDetailPattern1 = regexp.MustCompile(`in: Transfer start (?P<thread>\d+/\d+) buffers=(?P<buffers>\d+) files=(?P<files>\d+) size=(?P<size>[0-9.]+) MByte chunksize=(?P<chunksize>\d+) streams=(?P<streams>\d+) target-datarate=(?P<target_datarate>[0-9.]+) MByte/s protocol=(?P<protocol>\w+) dest=(?P<dest>\S+) sender-id=(?P<sender_id>\S+)`)
tsDetailPattern2 = regexp.MustCompile(`out: Start remote transfer to (?P<target>[^\s]+) request executed, duration=(?P<duration>[0-9.]+) s`)
tsDetailPattern3 = regexp.MustCompile(`out: Transfer start (?P<thread>\d+/\d+) buffers=(?P<buffers>\d+) files=(?P<files>\d+) size=(?P<size>[0-9.]+) MByte chunksize=(?P<chunksize>\d+) streams=(?P<streams>\d+) target-datarate=(?P<target_datarate>[0-9.]+) MByte/s protocol=(?P<protocol>\w+) src=(?P<src>\S+) receiver=(?P<receiver>\S+)`)
tsDetailPattern4 = regexp.MustCompile(`out: Start transfer (?P<thread>\d+/\d+), src=(?P<src>[^ ]*) dest=(?P<dest>[^ ]*) item\[0\]=(?P<item0>[^ ]*) count=(?P<count>\d+)`)
)
type TSParser struct{}
func (p *TSParser) Parse(line string) (models.LogMessage, error) {
newEntry := models.LogMessage{
Service: "tixstream",
}
syslogFields, logContent := helpers.ExtractSyslogHeader(line)
newEntry.Host = syslogFields.Hostname
newEntry.Raw = line
newEntry.Type = "service_log"
matches := tsServicePattern.FindStringSubmatch(logContent)
if len(matches) > 0 {
timestampStr := strings.Join(strings.Split(matches[2], " "), "T")
timestamp, err := helpers.ParseRFC3339WithOptionalZ(timestampStr)
if err != nil {
slog.Error("unable to parse time", "error", err)
}
if timestamp.IsZero() {
timestamp = syslogFields.SysLogTimestamp
}
newEntry.LogLevel = strings.TrimSpace(matches[1])
newEntry.LogLevel = strings.ReplaceAll(newEntry.LogLevel, "ACE_Message_Block", "")
newEntry.Timestamp = timestamp
newEntry.LogMessage = strings.TrimSpace(matches[3])
} else {
newEntry.LogMessage = logContent
}
var baseInfo models.TSTransferInfo
trNameMatch := tsTransferIDPattern.FindStringSubmatch(newEntry.LogMessage)
var transferID string
if len(trNameMatch) > 0 {
transferID = trNameMatch[1]
newEntry.LogMessage = trNameMatch[2]
split := strings.Fields(trNameMatch[2])
switch split[0] {
case "in:":
baseInfo.Direction = "incoming"
case "out:":
baseInfo.Direction = "outgoing"
}
}
msg := strings.ReplaceAll(logContent, " ", " ")
parts := strings.Fields(msg)
if len(parts) < 5 {
return newEntry, nil
}
tsDetail := tsDetailPattern1.FindStringSubmatch(newEntry.LogMessage)
if len(tsDetail) > 0 {
threadInt, _ := strconv.Atoi(strings.Split(tsDetail[1], "/")[0])
baseInfo.Lane = threadInt
buffersInt, _ := strconv.Atoi(tsDetail[2])
fileCountInt, _ := strconv.Atoi(tsDetail[3])
fileSizeFloat, _ := strconv.ParseFloat(tsDetail[4], 64)
streamsInt, _ := strconv.Atoi(tsDetail[6])
datarateFloat, _ := strconv.ParseFloat(tsDetail[7], 64)
chunkSizeInt, _ := strconv.Atoi(tsDetail[5])
baseInfo.Buffers = buffersInt
baseInfo.FileCount = fileCountInt
baseInfo.FileSizeMB = fileSizeFloat
baseInfo.ChunkSize = chunkSizeInt
baseInfo.Streams = streamsInt
baseInfo.TargetDatarate = datarateFloat
baseInfo.Protocoll = tsDetail[8]
baseInfo.Dest = tsDetail[9]
baseInfo.SenderID = tsDetail[10]
}
tsDetail = tsDetailPattern2.FindStringSubmatch(newEntry.LogMessage)
if len(tsDetail) > 0 {
baseInfo.Target = tsDetail[1]
}
tsDetail = tsDetailPattern3.FindStringSubmatch(newEntry.LogMessage)
if len(tsDetail) > 0 {
threadInt, _ := strconv.Atoi(strings.Split(tsDetail[1], "/")[0])
baseInfo.Lane = threadInt
buffersInt, _ := strconv.Atoi(tsDetail[2])
baseInfo.Buffers = buffersInt
fileCountInt, _ := strconv.Atoi(tsDetail[3])
fileSizeFloat, _ := strconv.ParseFloat(tsDetail[4], 64)
chunkSizeInt, _ := strconv.Atoi(tsDetail[5])
streamsInt, _ := strconv.Atoi(tsDetail[6])
datarateFloat, _ := strconv.ParseFloat(tsDetail[7], 64)
baseInfo.FileCount = fileCountInt
baseInfo.FileSizeMB = fileSizeFloat
baseInfo.ChunkSize = chunkSizeInt
baseInfo.Streams = streamsInt
baseInfo.TargetDatarate = datarateFloat
baseInfo.Protocoll = tsDetail[8]
baseInfo.Src = tsDetail[9]
baseInfo.Receiver = tsDetail[10]
}
tsDetail = tsDetailPattern4.FindStringSubmatch(newEntry.LogMessage)
if len(tsDetail) > 0 {
threadInt, _ := strconv.Atoi(strings.Split(tsDetail[1], "/")[0])
baseInfo.Lane = threadInt
baseInfo.Src = tsDetail[2]
baseInfo.Dest = tsDetail[3]
}
if strings.Contains(newEntry.LogMessage, "Transfer start") || strings.Contains(newEntry.LogMessage, "Transfer started,") {
baseInfo.StartTime = newEntry.Timestamp
}
if strings.Contains(newEntry.LogMessage, "Transfer stopped local state=finished") {
baseInfo.EndTime = newEntry.Timestamp
}
if transferID != "" {
baseInfo.TransferID = transferID
} else {
baseInfo.TransferID = "no_transfer_id"
}
if !baseInfo.StartTime.IsZero() {
newEntry.ServiceInformation = baseInfo
}
return newEntry, nil
}

179
patterns/repository.go Normal file
View file

@ -0,0 +1,179 @@
// package patterns
// import (
// "fmt"
// "regexp"
// "sync"
// "gopkg.in/yaml.v3"
// "os"
// )
// type PatternConfig struct {
// Patterns map[string]map[string]PatternDefinition `yaml:"patterns"`
// }
// type PatternDefinition struct {
// Regex string `yaml:"regex"`
// Description string `yaml:"description,omitempty"`
// }
// type Repository struct {
// compiledPatterns map[string]map[string]*regexp.Regexp
// mu sync.RWMutex
// }
// var (
// instance *Repository
// once sync.Once
// )
// func GetInstance() *Repository {
// once.Do(func() {
// instance = &Repository{
// compiledPatterns: make(map[string]map[string]*regexp.Regexp),
// }
// })
// return instance
// }
// func (r *Repository) Load(path string) error {
// r.mu.Lock()
// defer r.mu.Unlock()
// data, err := os.ReadFile(path)
// if err != nil {
// return fmt.Errorf("failed to read pattern config: %w", err)
// }
// var config PatternConfig
// if err := yaml.Unmarshal(data, &config); err != nil {
// return fmt.Errorf("failed to parse pattern config: %w", err)
// }
// for service, patterns := range config.Patterns {
// if _, exists := r.compiledPatterns[service]; !exists {
// r.compiledPatterns[service] = make(map[string]*regexp.Regexp)
// }
// for name, def := range patterns {
// compiled, err := regexp.Compile(def.Regex)
// if err != nil {
// return fmt.Errorf("invalid regex for %s/%s: %w", service, name, err)
// }
// r.compiledPatterns[service][name] = compiled
// }
// }
// return nil
// }
// func (r *Repository) Get(service string, name string) (*regexp.Regexp, error) {
// r.mu.RLock()
// defer r.mu.RUnlock()
// if svcPatterns, ok := r.compiledPatterns[service]; ok {
// if pattern, ok := svcPatterns[name]; ok {
// return pattern, nil
// }
// }
// return nil, fmt.Errorf("pattern not found: %s/%s", service, name)
// }
// func (r *Repository) MustGet(service string, name string) *regexp.Regexp {
// p, err := r.Get(service, name)
// if err != nil {
// panic(err)
// }
// return p
// }
package patterns
import (
"fmt"
"os"
"regexp"
"sync"
"gopkg.in/yaml.v3"
)
// Struktur der YAML Datei
type Config struct {
Patterns map[string]ServiceConfig `yaml:"patterns"`
}
type ServiceConfig struct {
Extractors []ExtractorConfig `yaml:"extractors"`
}
type ExtractorConfig struct {
Name string `yaml:"name"`
Regex string `yaml:"regex"`
Fields map[string]string `yaml:"fields"` // Name -> Typ (int, float, string)
}
// Interne kompilierte Struktur
type CompiledExtractor struct {
Name string
Pattern *regexp.Regexp
Fields map[string]string
}
type Repository struct {
services map[string][]CompiledExtractor
mu sync.RWMutex
}
var (
instance *Repository
once sync.Once
)
func GetInstance() *Repository {
once.Do(func() {
instance = &Repository{
services: make(map[string][]CompiledExtractor),
}
})
return instance
}
func (r *Repository) Load(path string) error {
r.mu.Lock()
defer r.mu.Unlock()
data, err := os.ReadFile(path)
if err != nil {
return fmt.Errorf("failed to read patterns file: %w", err)
}
var cfg Config
if err := yaml.Unmarshal(data, &cfg); err != nil {
return fmt.Errorf("failed to parse yaml: %w", err)
}
for service, svcCfg := range cfg.Patterns {
var compiledList []CompiledExtractor
for _, ext := range svcCfg.Extractors {
re, err := regexp.Compile(ext.Regex)
if err != nil {
return fmt.Errorf("invalid regex in service %s extractor %s: %w", service, ext.Name, err)
}
compiledList = append(compiledList, CompiledExtractor{
Name: ext.Name,
Pattern: re,
Fields: ext.Fields,
})
}
r.services[service] = compiledList
}
return nil
}
func (r *Repository) GetExtractors(service string) []CompiledExtractor {
r.mu.RLock()
defer r.mu.RUnlock()
return r.services[service]
}

View file

@ -11,17 +11,19 @@ import (
"strconv" "strconv"
"strings" "strings"
"time" "time"
"tixel_watch/models" "watch-tool/models"
"tixel_watch/parser" "watch-tool/parser"
) )
type ServiceMonitor struct { type ServiceMonitor struct {
config ServiceConfig config ServiceConfig
hostname string
} }
func NewServiceMonitor(config ServiceConfig) *ServiceMonitor { func NewServiceMonitor(config ServiceConfig, hostname string) *ServiceMonitor {
return &ServiceMonitor{ return &ServiceMonitor{
config: config, config: config,
hostname: hostname,
} }
} }
@ -50,7 +52,7 @@ func (sm *ServiceMonitor) Start(ctx context.Context, out chan<- models.LogMessag
} }
}() }()
parser := NewJournalEntryParser(sm.config.Name, sm.config.Service) parser := NewJournalEntryParser(sm.config.Name, sm.config.Service, sm.hostname)
for scanner.Scan() { for scanner.Scan() {
select { select {
@ -109,12 +111,14 @@ func (sm *ServiceMonitor) buildJournalctlArgs() []string {
type JournalEntryParser struct { type JournalEntryParser struct {
serviceName string serviceName string
unitName string unitName string
hostname string
} }
func NewJournalEntryParser(serviceName, unitName string) *JournalEntryParser { func NewJournalEntryParser(serviceName, unitName, hostname string) *JournalEntryParser {
return &JournalEntryParser{ return &JournalEntryParser{
serviceName: serviceName, serviceName: serviceName,
unitName: unitName, unitName: unitName,
hostname: hostname,
} }
} }
@ -124,7 +128,7 @@ func (jep *JournalEntryParser) Parse(jsonLine string) (models.LogMessage, error)
return models.LogMessage{}, fmt.Errorf("JSON unmarshal error: %w", err) return models.LogMessage{}, fmt.Errorf("JSON unmarshal error: %w", err)
} }
entry := models.NewLogMessage("service_log", hostname) entry := models.NewLogMessage("service_log", jep.hostname)
entry.Service = jep.serviceName entry.Service = jep.serviceName
entry.Unit = jep.unitName entry.Unit = jep.unitName
@ -211,7 +215,7 @@ func (jep *JournalEntryParser) extractSystemdFields(journalData map[string]any,
} }
func (jep *JournalEntryParser) parseServiceSpecific(entry models.LogMessage) models.LogMessage { func (jep *JournalEntryParser) parseServiceSpecific(entry models.LogMessage) models.LogMessage {
logParser, err := parser.New(jep.serviceName, "custom") logParser, err := parser.New(jep.serviceName, "custom", jep.hostname)
if err != nil { if err != nil {
slog.Error("cannot get service specific parser") slog.Error("cannot get service specific parser")
return entry return entry
@ -235,284 +239,3 @@ var (
tsDetailPattern4 = regexp.MustCompile(`out: Start transfer (?P<thread>\d+/\d+), src=(?P<src>[^ ]*) dest=(?P<dest>[^ ]*) item\[0\]=(?P<item0>[^ ]*) count=(?P<count>\d+)`) tsDetailPattern4 = regexp.MustCompile(`out: Start transfer (?P<thread>\d+/\d+), src=(?P<src>[^ ]*) dest=(?P<dest>[^ ]*) item\[0\]=(?P<item0>[^ ]*) count=(?P<count>\d+)`)
nginxAccessPattern = regexp.MustCompile(`^(\S+)\s+\S+\s+(\S+)\s+\[([^\]]+)\]\s+"([^"]+)"\s+(\d+)\s+(\d+|-)\s*(?:"([^"]*)"\s+"([^"]*)")?`) nginxAccessPattern = regexp.MustCompile(`^(\S+)\s+\S+\s+(\S+)\s+\[([^\]]+)\]\s+"([^"]+)"\s+(\d+)\s+(\d+|-)\s*(?:"([^"]*)"\s+"([^"]*)")?`)
) )
// func parseTixstreamService(entry models.LogMessage) models.LogMessage {
// newEntry := entry
// var baseInfo models.TSTransferInfo
// matches := tsServicePattern.FindStringSubmatch(newEntry.LogMessage)
// if len(matches) > 0 {
// timestamp := strings.Join(strings.Split(matches[2], " "), "T")
// newEntry.LogLevel = strings.TrimSpace(matches[1])
// if newEntry.Timestamp.IsZero() {
// timeParsed, err := parseRFC3339WithOptionalZ(timestamp)
// if err != nil {
// slog.Error("cant parse time string", "error", err)
// }
// newEntry.Timestamp = timeParsed
// }
// newEntry.LogMessage = strings.TrimSpace(matches[3])
// }
// trNameMatch := tsTransferIDPattern.FindStringSubmatch(newEntry.LogMessage)
// var transferID string
// if len(trNameMatch) > 0 {
// transferID = trNameMatch[1]
// newEntry.LogMessage = trNameMatch[2]
// split := strings.Fields(trNameMatch[2])
// switch split[0] {
// case "in:":
// baseInfo.Direction = "incoming"
// case "out:":
// baseInfo.Direction = "outgoing"
// }
// }
// msg := strings.ReplaceAll(newEntry.LogMessage, " ", " ")
// parts := strings.Fields(msg)
// if len(parts) < 5 {
// return newEntry
// }
// tsDetail := tsDetailPattern1.FindStringSubmatch(newEntry.LogMessage)
// if len(tsDetail) > 0 {
// threadInt, _ := strconv.Atoi(strings.Split(tsDetail[1], "/")[0])
// buffersInt, _ := strconv.Atoi(tsDetail[2])
// fileCountInt, _ := strconv.Atoi(tsDetail[3])
// chunkSizeInt, _ := strconv.Atoi(tsDetail[5])
// streamsInt, _ := strconv.Atoi(tsDetail[6])
// datarateFloat, _ := strconv.ParseFloat(tsDetail[7], 64)
// fileSizeFloat, _ := strconv.ParseFloat(tsDetail[4], 64)
// baseInfo.Lane = threadInt
// baseInfo.Buffers = buffersInt
// baseInfo.FileCount = fileCountInt
// baseInfo.FileSizeMB = fileSizeFloat
// baseInfo.ChunkSize = chunkSizeInt
// baseInfo.Streams = streamsInt
// baseInfo.TargetDatarate = datarateFloat
// baseInfo.Protocoll = tsDetail[8]
// baseInfo.Dest = tsDetail[9]
// baseInfo.SenderID = tsDetail[10]
// }
// tsDetail = tsDetailPattern2.FindStringSubmatch(newEntry.LogMessage)
// if len(tsDetail) > 0 {
// baseInfo.Target = tsDetail[1]
// }
// tsDetail = tsDetailPattern3.FindStringSubmatch(newEntry.LogMessage)
// if len(tsDetail) > 0 {
// threadInt, _ := strconv.Atoi(strings.Split(tsDetail[1], "/")[0])
// buffersInt, _ := strconv.Atoi(tsDetail[2])
// fileCountInt, _ := strconv.Atoi(tsDetail[3])
// fileSizeFloat, _ := strconv.ParseFloat(tsDetail[4], 64)
// chunkSizeInt, _ := strconv.Atoi(tsDetail[5])
// streamsInt, _ := strconv.Atoi(tsDetail[6])
// datarateFloat, _ := strconv.ParseFloat(tsDetail[7], 64)
// baseInfo.Lane = threadInt
// baseInfo.Buffers = buffersInt
// baseInfo.FileCount = fileCountInt
// baseInfo.FileSizeMB = fileSizeFloat
// baseInfo.ChunkSize = chunkSizeInt
// baseInfo.Streams = streamsInt
// baseInfo.TargetDatarate = datarateFloat
// baseInfo.Protocoll = tsDetail[8]
// baseInfo.Src = tsDetail[9]
// baseInfo.Receiver = tsDetail[10]
// }
// tsDetail = tsDetailPattern4.FindStringSubmatch(newEntry.LogMessage)
// if len(tsDetail) > 0 {
// threadInt, _ := strconv.Atoi(strings.Split(tsDetail[1], "/")[0])
// baseInfo.Lane = threadInt
// baseInfo.Src = tsDetail[2]
// baseInfo.Dest = tsDetail[3]
// }
// if strings.Contains(newEntry.LogMessage, "Transfer start") || strings.Contains(newEntry.LogMessage, "Transfer started,") {
// baseInfo.StartTime = newEntry.Timestamp
// } else {
// baseInfo.StartTime = time.Now()
// }
// if strings.Contains(newEntry.LogMessage, "Transfer stopped local state=finished") {
// baseInfo.EndTime = newEntry.Timestamp
// } else {
// baseInfo.EndTime = baseInfo.StartTime
// }
// if transferID != "" {
// baseInfo.TransferID = transferID
// } else {
// baseInfo.TransferID = "no_transfer_id"
// }
// newEntry.ServiceInformation = baseInfo
// return newEntry
// }
// func parseTJMService(entry models.LogMessage) models.LogMessage {
// newEntry := entry
// var baseInfo models.TJMTransferInfo
// logContent := entry.LogMessage
// msg := strings.TrimSpace(logContent)
// msg = strings.ReplaceAll(msg, " ", " ")
// msg = strings.ReplaceAll(msg, "---", "")
// msg = strings.ReplaceAll(msg, " ", " ")
// parts := strings.Fields(msg)
// if len(parts) < 4 {
// return newEntry
// }
// matches := tjmServicePattern.FindStringSubmatch(logContent)
// if len(matches) > 0 {
// timestamp := strings.Join(strings.Split(matches[2], " "), "T")
// newEntry.LogLevel = strings.TrimSpace(matches[1])
// if newEntry.Timestamp.IsZero() {
// timeParsed, err := parseRFC3339WithOptionalZ(timestamp)
// if err != nil {
// slog.Error("cant parse time string", "error", err)
// }
// newEntry.Timestamp = timeParsed
// }
// newEntry.LogLevel = strings.TrimSpace(matches[2])
// newEntry.LogMessage = strings.TrimSpace(matches[8])
// baseInfo = models.TJMTransferInfo{
// ProcessID: strings.TrimSpace(matches[3]),
// CorrelationID: strings.TrimSpace(matches[4]),
// Username: strings.TrimSpace(matches[5]),
// ThreadID: strings.TrimSpace(matches[6]),
// JavaClass: strings.TrimSpace(matches[7]),
// }
// } else {
// newEntry.LogMessage = logContent
// }
// trNameMatch := tjmTransferNamePattern.FindStringSubmatch(newEntry.LogMessage)
// var transferName string
// var transferID string
// if len(trNameMatch) > 0 {
// transferName = trNameMatch[1]
// newEntry.LogMessage = trNameMatch[2]
// if strings.Contains(trNameMatch[1], "-in") {
// baseInfo.Direction = "incoming"
// }
// if strings.Contains(trNameMatch[1], "-out") {
// baseInfo.Direction = "outgoing"
// }
// }
// trIDMatch := tjmTransferIDPattern1.FindStringSubmatch(newEntry.LogMessage)
// if len(trIDMatch) > 0 {
// transferID = trIDMatch[1]
// }
// trIDMatch = tjmTransferIDPattern2.FindStringSubmatch(newEntry.LogMessage)
// if len(trIDMatch) > 0 {
// transferID = trIDMatch[2]
// }
// if transferID != "" {
// baseInfo.TransferID = transferID
// } else if transferName != "" {
// baseInfo.TransferID = transferName
// } else {
// baseInfo.TransferID = "no_transfer_id"
// }
// baseInfo.StartTime = newEntry.Timestamp
// baseInfo.StartTime = newEntry.Timestamp
// newEntry.ServiceInformation = baseInfo
// return newEntry
// }
// func parseAMService(entry models.LogMessage) models.LogMessage {
// newEntry := entry
// logContent := newEntry.LogMessage
// matches := amServicePattern.FindStringSubmatch(strings.TrimSpace(logContent))
// if len(matches) != 7 {
// return newEntry
// }
// timestampStr := strings.Join(strings.Split(matches[1], " "), "T")
// if newEntry.Timestamp.IsZero() {
// timeParsed, err := parseRFC3339WithOptionalZ(timestampStr)
// if err != nil {
// slog.Error("cant parse time string", "error", err)
// }
// newEntry.Timestamp = timeParsed
// }
// baseInfo := models.AMBaseInfo{
// ProcessID: matches[3],
// ThreadID: strings.TrimSpace(matches[4]),
// LoggerName: matches[5],
// }
// newEntry.LogLevel = matches[2]
// newEntry.LogMessage = matches[6]
// newEntry.ServiceInformation = baseInfo
// return newEntry
// }
// func parseTCCService(entry models.LogMessage) models.LogMessage {
// newEntry := entry
// logContent := newEntry.LogMessage
// matches := tccServicePattern.FindStringSubmatch(logContent)
// if len(matches) != 7 {
// return newEntry
// }
// timestampStr := strings.Join(strings.Split(matches[1], " "), "T")
// if newEntry.Timestamp.IsZero() {
// timeParsed, err := parseRFC3339WithOptionalZ(timestampStr)
// if err != nil {
// slog.Error("cant parse time string", "error", err)
// }
// newEntry.Timestamp = timeParsed
// }
// baseInfo := models.TCCBaseInfo{
// ProcessID: matches[3],
// ThreadID: strings.TrimSpace(matches[4]),
// LoggerName: matches[5],
// }
// newEntry.LogLevel = matches[2]
// newEntry.LogMessage = matches[6]
// newEntry.ServiceInformation = baseInfo
// return newEntry
// }
// func parseNginxService(entry models.LogMessage) models.LogMessage {
// newEntry := entry
// matches := nginxAccessPattern.FindStringSubmatch(strings.TrimSpace(entry.LogMessage))
// if len(matches) < 7 {
// return newEntry
// }
// statusCode, err := strconv.ParseInt(matches[5], 10, 64)
// if err != nil {
// slog.Error("cant parse statuscode", "error", err)
// }
// bytesSend, err := strconv.ParseInt(matches[6], 10, 64)
// if err != nil {
// slog.Error("cant parse bytessend", "error", err)
// }
// baseInfo := models.NGinXBaseInfo{
// ClientIP: matches[1],
// RemoteUser: matches[2],
// Request: matches[4],
// StatusCode: int(statusCode),
// BytesSend: int(bytesSend),
// }
// if len(matches) > 7 && matches[7] != "" {
// baseInfo.Referer = matches[7]
// }
// if len(matches) > 8 && matches[8] != "" {
// baseInfo.UserAgent = matches[8]
// }
// if requestParts := strings.Fields(matches[4]); len(requestParts) >= 3 {
// baseInfo.HTTPMethod = requestParts[0]
// baseInfo.RequestURI = requestParts[1]
// baseInfo.HTTPVersion = requestParts[2]
// }
// newEntry.ServiceInformation = baseInfo
// return newEntry
// }
// func parseRFC3339WithOptionalZ(timeStr string) (time.Time, error) {
// if !strings.HasSuffix(timeStr, "Z") && !strings.ContainsAny(timeStr[len(timeStr)-6:], "+-") {
// timeStr += "Z"
// }
// return time.Parse(time.RFC3339Nano, timeStr)
// }

View file

@ -3,7 +3,7 @@ package main
import ( import (
"context" "context"
"time" "time"
"tixel_watch/models" "watch-tool/models"
) )
type StorageInterface interface { type StorageInterface interface {

View file

@ -12,9 +12,8 @@ import (
"strings" "strings"
"syscall" "syscall"
"time" "time"
"tixel_watch/models" "watch-tool/models"
"github.com/elastic/go-elasticsearch/v8"
"github.com/shirou/gopsutil/cpu" "github.com/shirou/gopsutil/cpu"
"github.com/shirou/gopsutil/disk" "github.com/shirou/gopsutil/disk"
"github.com/shirou/gopsutil/host" "github.com/shirou/gopsutil/host"
@ -31,24 +30,24 @@ type SystemMetricsCollector struct {
lastNetworkStats map[string]models.NetworkStat lastNetworkStats map[string]models.NetworkStat
lastDiskStats map[string]models.DiskIOStat lastDiskStats map[string]models.DiskIOStat
lastMeasureTime time.Time lastMeasureTime time.Time
hostname string
} }
func NewSystemMetricsCollector(config SystemMetrics, pollInterval int) *SystemMetricsCollector { func NewSystemMetricsCollector(config SystemMetrics, pollInterval int, hostname string) *SystemMetricsCollector {
return &SystemMetricsCollector{ return &SystemMetricsCollector{
config: config, config: config,
pollInterval: pollInterval, pollInterval: pollInterval,
lastNetworkStats: make(map[string]models.NetworkStat), lastNetworkStats: make(map[string]models.NetworkStat),
lastDiskStats: make(map[string]models.DiskIOStat), lastDiskStats: make(map[string]models.DiskIOStat),
lastMeasureTime: time.Now(), lastMeasureTime: time.Now(),
hostname: hostname,
} }
} }
func (smc *SystemMetricsCollector) Start(ctx context.Context, es *elasticsearch.Client, baseIndex string) { func (smc *SystemMetricsCollector) Start(ctx context.Context, storage StorageInterface, logChan chan<- models.LogMessage) {
ticker := time.NewTicker(time.Duration(smc.pollInterval) * time.Second) ticker := time.NewTicker(time.Duration(smc.pollInterval) * time.Second)
defer ticker.Stop() defer ticker.Stop()
sender := NewElasticsearchSender(es)
for { for {
select { select {
case <-ctx.Done(): case <-ctx.Done():
@ -61,15 +60,23 @@ func (smc *SystemMetricsCollector) Start(ctx context.Context, es *elasticsearch.
continue continue
} }
if err := sender.SendSystemMetrics(baseIndex, metrics); err != nil { entry := models.NewLogMessage("system_metrics", smc.hostname)
slog.Error("error sending system metrics", "error", err) entry.Service = "system-metrics"
entry.LogLevel = "Info"
entry.SystemMetrics = metrics
select {
case logChan <- entry:
case <-ctx.Done():
return
default:
slog.Warn("Log channel is full, system metrics dropped")
} }
} }
} }
} }
func (smc *SystemMetricsCollector) collectMetrics() (models.SystemResources, error) { func (smc *SystemMetricsCollector) collectMetrics() (models.SystemResources, error) {
result := models.NewSystemResources(hostname) result := models.NewSystemResources(smc.hostname)
var err error var err error

View file

@ -1,40 +0,0 @@
package main
import (
"context"
"log/slog"
"time"
"tixel_watch/models"
)
func (smc *SystemMetricsCollector) StartV2(ctx context.Context, storage StorageInterface, logChan chan<- models.LogMessage) {
ticker := time.NewTicker(time.Duration(smc.pollInterval) * time.Second)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
slog.Info("System metrics collector stopped")
return
case <-ticker.C:
metrics, err := smc.collectMetrics()
if err != nil {
slog.Error("error collecting system metrics", "error", err)
continue
}
entry := models.NewLogMessage("system_metrics", hostname)
entry.Service = "system-metrics"
entry.LogLevel = "Info"
entry.SystemMetrics = metrics
select {
case logChan <- entry:
case <-ctx.Done():
return
default:
slog.Warn("Log channel is full, system metrics dropped")
}
}
}
}

View file

@ -7,35 +7,46 @@ import (
"log/slog" "log/slog"
"net/http" "net/http"
"os/exec" "os/exec"
"slices"
"strconv" "strconv"
"strings" "strings"
"time" "time"
"watch-tool/models"
"github.com/elastic/go-elasticsearch/v8"
) )
type WebService struct { type WebService struct {
server *http.Server server *http.Server
esClient *elasticsearch.Client storage StorageInterface
config *Config config *Config
} }
func NewWebService(config *Config, esClient *elasticsearch.Client) *WebService { func LoggingMiddleware(next http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
slog.Debug("WebService", "Remote-Address", r.RemoteAddr, "Method", r.Method, "Path", r.URL.Path)
next.ServeHTTP(w, r)
})
}
func NewWebService(config *Config, storage StorageInterface) *WebService {
mux := http.NewServeMux() mux := http.NewServeMux()
ws := &WebService{ ws := &WebService{
esClient: esClient, storage: storage,
config: config, config: config,
} }
mux.HandleFunc("/export", ws.handleExport) mux.HandleFunc("GET /health", ws.handleHealth)
mux.HandleFunc("/health", ws.handleHealth) mux.HandleFunc("GET /logs", ws.handleLogs)
mux.HandleFunc("/indices", ws.handleIndices) mux.HandleFunc("GET /export", ws.handleExport)
mux.HandleFunc("GET /stats", ws.handleStats)
mux.HandleFunc("GET /stats/{service}", ws.handleServiceStats)
loggedMux := LoggingMiddleware(mux)
addr := fmt.Sprintf("%s:%d", config.WebService.Host, config.WebService.Port) addr := fmt.Sprintf("%s:%d", config.WebService.Host, config.WebService.Port)
ws.server = &http.Server{ ws.server = &http.Server{
Addr: addr, Addr: addr,
Handler: mux, Handler: loggedMux,
ReadTimeout: 30 * time.Second, ReadTimeout: 30 * time.Second,
WriteTimeout: 300 * time.Second, WriteTimeout: 300 * time.Second,
IdleTimeout: 60 * time.Second, IdleTimeout: 60 * time.Second,
@ -64,67 +75,30 @@ func (ws *WebService) Start(ctx context.Context) error {
return nil return nil
} }
func (ws *WebService) handleExport(w http.ResponseWriter, r *http.Request) {
if r.Method != http.MethodGet {
http.Error(w, "Method not allowed", http.StatusMethodNotAllowed)
return
}
indices := ws.parseIndicesParam(r)
if len(indices) == 0 {
http.Error(w, "No indices specified. Use ?indices=index1,index2", http.StatusBadRequest)
return
}
size := ws.parseSizeParam(r)
since := ws.parseSinceParam(r)
slog.Info("Export request received", "indices", indices, "size", size, "since", since)
w.Header().Set("Content-Type", "application/json")
w.Header().Set("Content-Disposition", "attachment; filename=elasticsearch_export.json")
exporter := NewElasticsearchExporter(ws.esClient)
if err := exporter.ExportToStream(r.Context(), indices, size, since, w); err != nil {
slog.Error("export error", "error", err)
http.Error(w, fmt.Sprintf("Export error: %v", err), http.StatusInternalServerError)
return
}
slog.Info("Export completed successfully", "indices", indices)
}
func (ws *WebService) handleHealth(w http.ResponseWriter, r *http.Request) { func (ws *WebService) handleHealth(w http.ResponseWriter, r *http.Request) {
if r.Method != http.MethodGet {
http.Error(w, "Method not allowed", http.StatusMethodNotAllowed) status := map[string]any{
return "status": "healthy",
"timestamp": time.Now(),
"storage": "sqlite",
} }
ctx, cancel := context.WithTimeout(r.Context(), 5*time.Second) ctx, cancel := context.WithTimeout(r.Context(), 5*time.Second)
defer cancel() defer cancel()
res, err := ws.esClient.Info(ws.esClient.Info.WithContext(ctx)) _, err := ws.storage.Query(ctx, StorageQuery{
Limit: 1,
})
if err != nil { if err != nil {
status["storage_status"] = "unhealthy"
status["storage_error"] = err.Error()
w.WriteHeader(http.StatusServiceUnavailable) w.WriteHeader(http.StatusServiceUnavailable)
json.NewEncoder(w).Encode(map[string]any{ } else {
"status": "unhealthy", status["storage_status"] = "healthy"
"error": err.Error(),
})
return
} }
res.Body.Close()
if res.IsError() {
w.WriteHeader(http.StatusServiceUnavailable)
json.NewEncoder(w).Encode(map[string]any{
"status": "unhealthy",
"error": res.String(),
})
return
}
statusMap := make(map[string]any) statusMap := make(map[string]any)
statusMap["elasticsearch"] = map[string]any{"status": "healthy", "timestamp": time.Now()} statusMap["tixel-watch"] = status
for _, service := range ws.config.Services { for _, service := range ws.config.Services {
statusCommand := []string{"sudo", "systemctl", "status", service.Name, "--no-pager"} statusCommand := []string{"sudo", "systemctl", "status", service.Name, "--no-pager"}
@ -150,89 +124,267 @@ func (ws *WebService) handleHealth(w http.ResponseWriter, r *http.Request) {
json.NewEncoder(w).Encode(statusMap) json.NewEncoder(w).Encode(statusMap)
} }
func (ws *WebService) handleIndices(w http.ResponseWriter, r *http.Request) { func (ws *WebService) handleLogs(w http.ResponseWriter, r *http.Request) {
if r.Method != http.MethodGet {
http.Error(w, "Method not allowed", http.StatusMethodNotAllowed) query := ws.parseLogsQuery(r)
ctx, cancel := context.WithTimeout(r.Context(), 30*time.Second)
defer cancel()
entries, err := ws.storage.Query(ctx, query)
if err != nil {
http.Error(w, fmt.Sprintf("Query error: %v", err), http.StatusInternalServerError)
return return
} }
response := map[string]any{
"entries": entries,
"count": len(entries),
"query": query,
}
w.Header().Set("Content-Type", "application/json")
json.NewEncoder(w).Encode(response)
}
func (ws *WebService) handleExport(w http.ResponseWriter, r *http.Request) {
query := ws.parseLogsQuery(r)
ctx, cancel := context.WithTimeout(r.Context(), 300*time.Second)
defer cancel()
entries, err := ws.storage.Query(ctx, query)
if err != nil {
http.Error(w, fmt.Sprintf("Export query error: %v", err), http.StatusInternalServerError)
return
}
filename := fmt.Sprintf("tixel_export_%s.json", time.Now().Format("20060102_150405"))
w.Header().Set("Content-Type", "application/json")
w.Header().Set("Content-Disposition", fmt.Sprintf("attachment; filename=%s", filename))
exportData := map[string]any{
"export_info": map[string]any{
"timestamp": time.Now(),
"entry_count": len(entries),
"query": query,
"exported_by": "tixel-watch",
},
"entries": entries,
}
if err := json.NewEncoder(w).Encode(exportData); err != nil {
slog.Error("Failed to encode export data", "error", err)
return
}
slog.Info("Data exported", "count", len(entries), "query", query)
}
func (ws *WebService) handleStats(w http.ResponseWriter, r *http.Request) {
ctx, cancel := context.WithTimeout(r.Context(), 10*time.Second) ctx, cancel := context.WithTimeout(r.Context(), 10*time.Second)
defer cancel() defer cancel()
res, err := ws.esClient.Cat.Indices( allEntries, err := ws.storage.Query(ctx, StorageQuery{})
ws.esClient.Cat.Indices.WithContext(ctx),
ws.esClient.Cat.Indices.WithFormat("json"),
)
if err != nil { if err != nil {
http.Error(w, fmt.Sprintf("Error fetching indices: %v", err), http.StatusInternalServerError) http.Error(w, fmt.Sprintf("Stats query error: %v", err), http.StatusInternalServerError)
return
}
defer res.Body.Close()
if res.IsError() {
http.Error(w, fmt.Sprintf("Elasticsearch error: %s", res.String()), http.StatusInternalServerError)
return return
} }
var indices []map[string]any recentEntries, err := ws.storage.Query(ctx, StorageQuery{
if err := json.NewDecoder(res.Body).Decode(&indices); err != nil { StartTime: time.Now().Add(-time.Hour),
http.Error(w, fmt.Sprintf("Error decoding response: %v", err), http.StatusInternalServerError)
return
}
w.Header().Set("Content-Type", "application/json")
json.NewEncoder(w).Encode(map[string]any{
"indices": indices,
"count": len(indices),
}) })
} if err != nil {
slog.Error("Failed to query recent entries", "error", err)
func (ws *WebService) parseIndicesParam(r *http.Request) []string { recentEntries = []models.LogMessage{}
indicesParam := r.URL.Query().Get("indices")
if indicesParam == "" {
return nil
} }
indices := strings.Split(indicesParam, ",") stats := map[string]any{
var result []string "total_entries": len(allEntries),
for _, index := range indices { "recent_entries": len(recentEntries),
index = strings.TrimSpace(index) "timestamp": time.Now(),
if index != "" { }
result = append(result, index)
typeCounts := make(map[string]int)
serviceCounts := make(map[string]int)
toolCounts := make(map[string]int)
for _, entry := range allEntries {
typeCounts[entry.Type]++
if entry.Service != "" {
serviceCounts[entry.Service]++
}
if entry.Tool != "" {
toolCounts[entry.Tool]++
} }
} }
return result stats["by_type"] = typeCounts
stats["by_service"] = serviceCounts
stats["by_tool"] = toolCounts
w.Header().Set("Content-Type", "application/json")
json.NewEncoder(w).Encode(stats)
} }
func (ws *WebService) parseSinceParam(r *http.Request) int { func (ws *WebService) handleServiceStats(w http.ResponseWriter, r *http.Request) {
sinceParam := r.URL.Query().Get("since") service := r.PathValue("service")
if sinceParam == "" { if service == "" {
return 0 http.Error(w, "Service parameter is missing", http.StatusBadRequest)
return
} }
since, err := strconv.Atoi(sinceParam) timeRangeStr := r.URL.Query().Get("time_range")
var startTime time.Time
if timeRangeStr == "" {
startTime = time.Now().Add(-24 * time.Hour)
} else {
duration, err := time.ParseDuration(timeRangeStr)
if err != nil {
http.Error(w, fmt.Sprintf("Invalid time_range: %v", err), http.StatusBadRequest)
return
}
startTime = time.Now().Add(-duration)
}
query := StorageQuery{
Service: service,
StartTime: startTime,
Limit: 0,
OrderDesc: false,
}
ctx, cancel := context.WithTimeout(r.Context(), 30*time.Second)
defer cancel()
entries, err := ws.storage.Query(ctx, query)
if err != nil { if err != nil {
return 0 slog.Error("Failed to query service stats", "service", service, "error", err)
http.Error(w, fmt.Sprintf("Query error: %v", err), http.StatusInternalServerError)
return
}
uniqueTransfersTotal := make(map[string]struct{})
uniqueTransfersIncoming := make(map[string]struct{})
uniqueTransfersOutgoing := make(map[string]struct{})
uniqueTransfersNil := make(map[string]struct{})
for _, entry := range entries {
var identifier string
var direction string
switch v := entry.ServiceInformation.(type) {
case models.TSTransferInfo:
identifier = v.TransferID
direction = v.Direction
case *models.TSTransferInfo:
identifier = v.TransferID
direction = v.Direction
case models.TJMTransferInfo:
identifier = v.TransferID
direction = v.Direction
case *models.TJMTransferInfo:
identifier = v.TransferID
direction = v.Direction
case map[string]any:
identifier, _ = v["transfer_identifier"].(string)
direction, _ = v["direction"].(string)
default:
continue
}
if identifier != "" {
uniqueTransfersTotal[identifier] = struct{}{}
switch strings.ToLower(direction) {
case "incoming":
uniqueTransfersIncoming[identifier] = struct{}{}
case "outgoing":
uniqueTransfersOutgoing[identifier] = struct{}{}
default:
uniqueTransfersNil[identifier] = struct{}{}
}
}
} }
return since stats := map[string]any{
"service": service,
"start_time": startTime,
"end_time": time.Now(),
"transfer_counts": map[string]any{
"total": len(uniqueTransfersTotal),
"incoming": len(uniqueTransfersIncoming),
"outgoing": len(uniqueTransfersOutgoing),
"nil_or_unknown_direction": len(uniqueTransfersNil),
},
"entry_count": len(entries),
}
w.Header().Set("Content-Type", "application/json")
json.NewEncoder(w).Encode(stats)
} }
func (ws *WebService) parseSizeParam(r *http.Request) int { func (ws *WebService) parseLogsQuery(r *http.Request) StorageQuery {
sizeParam := r.URL.Query().Get("size") query := StorageQuery{
if sizeParam == "" { Limit: 100,
return 1000 OrderBy: "timestamp",
OrderDesc: true,
} }
size, err := strconv.Atoi(sizeParam) if limitStr := r.URL.Query().Get("limit"); limitStr != "" {
if err != nil || size <= 0 { if limit, err := strconv.Atoi(limitStr); err == nil && limit > 0 {
return 1000 if limit > 10000 {
limit = 10000
}
query.Limit = limit
}
} }
if size > 10000 { if offsetStr := r.URL.Query().Get("offset"); offsetStr != "" {
size = 10000 if offset, err := strconv.Atoi(offsetStr); err == nil && offset >= 0 {
query.Offset = offset
}
} }
return size if startTime := r.URL.Query().Get("start_time"); startTime != "" {
if t, err := time.Parse(time.RFC3339, startTime); err == nil {
query.StartTime = t
}
}
if endTime := r.URL.Query().Get("end_time"); endTime != "" {
if t, err := time.Parse(time.RFC3339, endTime); err == nil {
query.EndTime = t
}
}
if service := r.URL.Query().Get("service"); service != "" {
query.Service = strings.TrimSpace(service)
}
if tool := r.URL.Query().Get("tool"); tool != "" {
query.Tool = strings.TrimSpace(tool)
}
if logLevel := r.URL.Query().Get("log_level"); logLevel != "" {
query.LogLevel = strings.TrimSpace(logLevel)
}
if entryType := r.URL.Query().Get("type"); entryType != "" {
query.Type = strings.TrimSpace(entryType)
}
if orderBy := r.URL.Query().Get("order_by"); orderBy != "" {
validFields := []string{"timestamp", "service", "tool", "type", "log_level"}
if slices.Contains(validFields, orderBy) {
query.OrderBy = orderBy
}
}
if orderDesc := r.URL.Query().Get("order_desc"); orderDesc != "" {
query.OrderDesc = orderDesc == "true"
}
return query
} }

View file

@ -1,390 +0,0 @@
package main
import (
"context"
"encoding/json"
"fmt"
"log/slog"
"net/http"
"os/exec"
"slices"
"strconv"
"strings"
"time"
"tixel_watch/models"
)
type WebServiceV2 struct {
server *http.Server
storage StorageInterface
config *Config
}
func LoggingMiddleware(next http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
slog.Debug("WebService", "Remote-Address", r.RemoteAddr, "Method", r.Method, "Path", r.URL.Path)
next.ServeHTTP(w, r)
})
}
func NewWebServiceV2(config *Config, storage StorageInterface) *WebServiceV2 {
mux := http.NewServeMux()
ws := &WebServiceV2{
storage: storage,
config: config,
}
mux.HandleFunc("GET /health", ws.handleHealth)
mux.HandleFunc("GET /logs", ws.handleLogs)
mux.HandleFunc("GET /export", ws.handleExport)
mux.HandleFunc("GET /stats", ws.handleStats)
mux.HandleFunc("GET /stats/{service}", ws.handleServiceStats)
loggedMux := LoggingMiddleware(mux)
addr := fmt.Sprintf("%s:%d", config.WebService.Host, config.WebService.Port)
ws.server = &http.Server{
Addr: addr,
Handler: loggedMux,
ReadTimeout: 30 * time.Second,
WriteTimeout: 300 * time.Second,
IdleTimeout: 60 * time.Second,
}
return ws
}
func (ws *WebServiceV2) Start(ctx context.Context) error {
go func() {
<-ctx.Done()
shutdownCtx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
if err := ws.server.Shutdown(shutdownCtx); err != nil {
slog.Error("web service shutdown error", "error", err)
}
}()
slog.Info("Starting web service", "address", ws.server.Addr)
if err := ws.server.ListenAndServe(); err != http.ErrServerClosed {
return fmt.Errorf("web service error: %w", err)
}
return nil
}
func (ws *WebServiceV2) handleHealth(w http.ResponseWriter, r *http.Request) {
status := map[string]any{
"status": "healthy",
"timestamp": time.Now(),
"storage": "sqlite",
}
ctx, cancel := context.WithTimeout(r.Context(), 5*time.Second)
defer cancel()
_, err := ws.storage.Query(ctx, StorageQuery{
Limit: 1,
})
if err != nil {
status["storage_status"] = "unhealthy"
status["storage_error"] = err.Error()
w.WriteHeader(http.StatusServiceUnavailable)
} else {
status["storage_status"] = "healthy"
}
statusMap := make(map[string]any)
statusMap["tixel-watch"] = status
for _, service := range ws.config.Services {
statusCommand := []string{"sudo", "systemctl", "status", service.Name, "--no-pager"}
if service.Enabled {
serviceStatus, err := exec.Command(statusCommand[0], statusCommand[1:]...).Output()
if err != nil {
slog.Error("error executing status command", "error", err)
continue
}
lines := strings.SplitSeq(string(serviceStatus), "\n")
for line := range lines {
if strings.Contains(line, "Active:") {
serviceHealth, found := strings.CutPrefix(strings.TrimSpace(line), "Active:")
if found {
statusMap[service.Name] = map[string]any{"status": serviceHealth, "timestamp": time.Now()}
}
}
}
}
}
w.Header().Set("Content-Type", "application/json")
json.NewEncoder(w).Encode(statusMap)
}
func (ws *WebServiceV2) handleLogs(w http.ResponseWriter, r *http.Request) {
query := ws.parseLogsQuery(r)
ctx, cancel := context.WithTimeout(r.Context(), 30*time.Second)
defer cancel()
entries, err := ws.storage.Query(ctx, query)
if err != nil {
http.Error(w, fmt.Sprintf("Query error: %v", err), http.StatusInternalServerError)
return
}
response := map[string]any{
"entries": entries,
"count": len(entries),
"query": query,
}
w.Header().Set("Content-Type", "application/json")
json.NewEncoder(w).Encode(response)
}
func (ws *WebServiceV2) handleExport(w http.ResponseWriter, r *http.Request) {
query := ws.parseLogsQuery(r)
ctx, cancel := context.WithTimeout(r.Context(), 300*time.Second)
defer cancel()
entries, err := ws.storage.Query(ctx, query)
if err != nil {
http.Error(w, fmt.Sprintf("Export query error: %v", err), http.StatusInternalServerError)
return
}
filename := fmt.Sprintf("tixel_export_%s.json", time.Now().Format("20060102_150405"))
w.Header().Set("Content-Type", "application/json")
w.Header().Set("Content-Disposition", fmt.Sprintf("attachment; filename=%s", filename))
exportData := map[string]any{
"export_info": map[string]any{
"timestamp": time.Now(),
"entry_count": len(entries),
"query": query,
"exported_by": "tixel-watch",
},
"entries": entries,
}
if err := json.NewEncoder(w).Encode(exportData); err != nil {
slog.Error("Failed to encode export data", "error", err)
return
}
slog.Info("Data exported", "count", len(entries), "query", query)
}
func (ws *WebServiceV2) handleStats(w http.ResponseWriter, r *http.Request) {
ctx, cancel := context.WithTimeout(r.Context(), 10*time.Second)
defer cancel()
allEntries, err := ws.storage.Query(ctx, StorageQuery{})
if err != nil {
http.Error(w, fmt.Sprintf("Stats query error: %v", err), http.StatusInternalServerError)
return
}
recentEntries, err := ws.storage.Query(ctx, StorageQuery{
StartTime: time.Now().Add(-time.Hour),
})
if err != nil {
slog.Error("Failed to query recent entries", "error", err)
recentEntries = []models.LogMessage{}
}
stats := map[string]any{
"total_entries": len(allEntries),
"recent_entries": len(recentEntries),
"timestamp": time.Now(),
}
typeCounts := make(map[string]int)
serviceCounts := make(map[string]int)
toolCounts := make(map[string]int)
for _, entry := range allEntries {
typeCounts[entry.Type]++
if entry.Service != "" {
serviceCounts[entry.Service]++
}
if entry.Tool != "" {
toolCounts[entry.Tool]++
}
}
stats["by_type"] = typeCounts
stats["by_service"] = serviceCounts
stats["by_tool"] = toolCounts
w.Header().Set("Content-Type", "application/json")
json.NewEncoder(w).Encode(stats)
}
func (ws *WebServiceV2) handleServiceStats(w http.ResponseWriter, r *http.Request) {
service := r.PathValue("service")
if service == "" {
http.Error(w, "Service parameter is missing", http.StatusBadRequest)
return
}
timeRangeStr := r.URL.Query().Get("time_range")
var startTime time.Time
if timeRangeStr == "" {
startTime = time.Now().Add(-24 * time.Hour)
} else {
duration, err := time.ParseDuration(timeRangeStr)
if err != nil {
http.Error(w, fmt.Sprintf("Invalid time_range: %v", err), http.StatusBadRequest)
return
}
startTime = time.Now().Add(-duration)
}
query := StorageQuery{
Service: service,
StartTime: startTime,
Limit: 0,
OrderDesc: false,
}
ctx, cancel := context.WithTimeout(r.Context(), 30*time.Second)
defer cancel()
entries, err := ws.storage.Query(ctx, query)
if err != nil {
slog.Error("Failed to query service stats", "service", service, "error", err)
http.Error(w, fmt.Sprintf("Query error: %v", err), http.StatusInternalServerError)
return
}
uniqueTransfersTotal := make(map[string]struct{})
uniqueTransfersIncoming := make(map[string]struct{})
uniqueTransfersOutgoing := make(map[string]struct{})
uniqueTransfersNil := make(map[string]struct{})
for _, entry := range entries {
var identifier string
var direction string
switch v := entry.ServiceInformation.(type) {
case models.TSTransferInfo:
identifier = v.TransferID
direction = v.Direction
case *models.TSTransferInfo:
identifier = v.TransferID
direction = v.Direction
case models.TJMTransferInfo:
identifier = v.TransferID
direction = v.Direction
case *models.TJMTransferInfo:
identifier = v.TransferID
direction = v.Direction
case map[string]any:
identifier, _ = v["transfer_identifier"].(string)
direction, _ = v["direction"].(string)
default:
continue
}
if identifier != "" {
uniqueTransfersTotal[identifier] = struct{}{}
switch strings.ToLower(direction) {
case "incoming":
uniqueTransfersIncoming[identifier] = struct{}{}
case "outgoing":
uniqueTransfersOutgoing[identifier] = struct{}{}
default:
uniqueTransfersNil[identifier] = struct{}{}
}
}
}
stats := map[string]any{
"service": service,
"start_time": startTime,
"end_time": time.Now(),
"transfer_counts": map[string]any{
"total": len(uniqueTransfersTotal),
"incoming": len(uniqueTransfersIncoming),
"outgoing": len(uniqueTransfersOutgoing),
"nil_or_unknown_direction": len(uniqueTransfersNil),
},
"entry_count": len(entries),
}
w.Header().Set("Content-Type", "application/json")
json.NewEncoder(w).Encode(stats)
}
func (ws *WebServiceV2) parseLogsQuery(r *http.Request) StorageQuery {
query := StorageQuery{
Limit: 100,
OrderBy: "timestamp",
OrderDesc: true,
}
if limitStr := r.URL.Query().Get("limit"); limitStr != "" {
if limit, err := strconv.Atoi(limitStr); err == nil && limit > 0 {
if limit > 10000 {
limit = 10000
}
query.Limit = limit
}
}
if offsetStr := r.URL.Query().Get("offset"); offsetStr != "" {
if offset, err := strconv.Atoi(offsetStr); err == nil && offset >= 0 {
query.Offset = offset
}
}
if startTime := r.URL.Query().Get("start_time"); startTime != "" {
if t, err := time.Parse(time.RFC3339, startTime); err == nil {
query.StartTime = t
}
}
if endTime := r.URL.Query().Get("end_time"); endTime != "" {
if t, err := time.Parse(time.RFC3339, endTime); err == nil {
query.EndTime = t
}
}
if service := r.URL.Query().Get("service"); service != "" {
query.Service = strings.TrimSpace(service)
}
if tool := r.URL.Query().Get("tool"); tool != "" {
query.Tool = strings.TrimSpace(tool)
}
if logLevel := r.URL.Query().Get("log_level"); logLevel != "" {
query.LogLevel = strings.TrimSpace(logLevel)
}
if entryType := r.URL.Query().Get("type"); entryType != "" {
query.Type = strings.TrimSpace(entryType)
}
if orderBy := r.URL.Query().Get("order_by"); orderBy != "" {
validFields := []string{"timestamp", "service", "tool", "type", "log_level"}
if slices.Contains(validFields, orderBy) {
query.OrderBy = orderBy
}
}
if orderDesc := r.URL.Query().Get("order_desc"); orderDesc != "" {
query.OrderDesc = orderDesc == "true"
}
return query
}