watch-tool/web_service.go

418 lines
11 KiB
Go

package main
import (
"context"
"encoding/json"
"fmt"
"log/slog"
"net/http"
"os/exec"
"slices"
"strconv"
"strings"
"time"
"watch-tool/models"
)
type WebService struct {
server *http.Server
storage StorageInterface
config *Config
}
func LoggingMiddleware(next http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
slog.Debug("WebService", "Remote-Address", r.RemoteAddr, "Method", r.Method, "Path", r.URL.Path)
next.ServeHTTP(w, r)
})
}
func NewWebService(config *Config, storage StorageInterface) *WebService {
mux := http.NewServeMux()
ws := &WebService{
storage: storage,
config: config,
}
mux.HandleFunc("GET /health", ws.handleHealth)
mux.HandleFunc("GET /logs", ws.handleLogs)
mux.HandleFunc("GET /export", ws.handleExport)
mux.HandleFunc("GET /stats", ws.handleStats)
mux.HandleFunc("GET /stats/{service}", ws.handleServiceStats)
loggedMux := LoggingMiddleware(mux)
addr := fmt.Sprintf("%s:%d", config.WebService.Host, config.WebService.Port)
ws.server = &http.Server{
Addr: addr,
Handler: loggedMux,
ReadTimeout: 30 * time.Second,
WriteTimeout: 300 * time.Second,
IdleTimeout: 60 * time.Second,
}
return ws
}
func (ws *WebService) Start(ctx context.Context) error {
go func() {
<-ctx.Done()
shutdownCtx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
if err := ws.server.Shutdown(shutdownCtx); err != nil {
slog.Error("web service shutdown error", "error", err)
}
}()
slog.Info("Starting web service", "address", ws.server.Addr)
if err := ws.server.ListenAndServe(); err != http.ErrServerClosed {
return fmt.Errorf("web service error: %w", err)
}
return nil
}
func (ws *WebService) handleHealth(w http.ResponseWriter, r *http.Request) {
status := map[string]any{
"status": "healthy",
"timestamp": time.Now(),
"storage": "sqlite",
}
ctx, cancel := context.WithTimeout(r.Context(), 5*time.Second)
defer cancel()
_, err := ws.storage.Query(ctx, StorageQuery{
Limit: 1,
})
if err != nil {
status["storage_status"] = "unhealthy"
status["storage_error"] = err.Error()
w.WriteHeader(http.StatusServiceUnavailable)
} else {
status["storage_status"] = "healthy"
}
statusMap := make(map[string]any)
statusMap["watch"] = status
for _, service := range ws.config.Services {
statusCommand := []string{"sudo", "systemctl", "status", service.Name, "--no-pager"}
if service.Enabled {
serviceStatus, err := exec.Command(statusCommand[0], statusCommand[1:]...).Output()
if err != nil {
slog.Error("error executing status command", "error", err)
continue
}
lines := strings.SplitSeq(string(serviceStatus), "\n")
for line := range lines {
if strings.Contains(line, "Active:") {
serviceHealth, found := strings.CutPrefix(strings.TrimSpace(line), "Active:")
if found {
statusMap[service.Name] = map[string]any{"status": serviceHealth, "timestamp": time.Now()}
}
}
}
}
}
w.Header().Set("Content-Type", "application/json")
json.NewEncoder(w).Encode(statusMap)
}
func (ws *WebService) handleLogs(w http.ResponseWriter, r *http.Request) {
query := ws.parseLogsQuery(r)
ctx, cancel := context.WithTimeout(r.Context(), 30*time.Second)
defer cancel()
entries, err := ws.storage.Query(ctx, query)
if err != nil {
http.Error(w, fmt.Sprintf("Query error: %v", err), http.StatusInternalServerError)
return
}
response := map[string]any{
"entries": entries,
"count": len(entries),
"query": query,
}
w.Header().Set("Content-Type", "application/json")
json.NewEncoder(w).Encode(response)
}
func (ws *WebService) handleExport(w http.ResponseWriter, r *http.Request) {
query := ws.parseLogsQuery(r)
ctx, cancel := context.WithTimeout(r.Context(), 300*time.Second)
defer cancel()
entries, err := ws.storage.Query(ctx, query)
if err != nil {
http.Error(w, fmt.Sprintf("Export query error: %v", err), http.StatusInternalServerError)
return
}
filename := fmt.Sprintf("watch_export_%s.json", time.Now().Format("20060102_150405"))
w.Header().Set("Content-Type", "application/json")
w.Header().Set("Content-Disposition", fmt.Sprintf("attachment; filename=%s", filename))
exportData := map[string]any{
"export_info": map[string]any{
"timestamp": time.Now(),
"entry_count": len(entries),
"query": query,
"exported_by": "watch",
},
"entries": entries,
}
if err := json.NewEncoder(w).Encode(exportData); err != nil {
slog.Error("Failed to encode export data", "error", err)
return
}
slog.Info("Data exported", "count", len(entries), "query", query)
}
func (ws *WebService) handleStats(w http.ResponseWriter, r *http.Request) {
ctx, cancel := context.WithTimeout(r.Context(), 10*time.Second)
defer cancel()
allEntries, err := ws.storage.Query(ctx, StorageQuery{})
if err != nil {
http.Error(w, fmt.Sprintf("Stats query error: %v", err), http.StatusInternalServerError)
return
}
recentEntries, err := ws.storage.Query(ctx, StorageQuery{
StartTime: time.Now().Add(-time.Hour),
})
if err != nil {
slog.Error("Failed to query recent entries", "error", err)
recentEntries = []models.LogMessage{}
}
stats := map[string]any{
"total_entries": len(allEntries),
"recent_entries": len(recentEntries),
"timestamp": time.Now(),
}
typeCounts := make(map[string]int)
serviceCounts := make(map[string]int)
toolCounts := make(map[string]int)
for _, entry := range allEntries {
typeCounts[entry.Type]++
if entry.Service != "" {
serviceCounts[entry.Service]++
}
if entry.Tool != "" {
toolCounts[entry.Tool]++
}
}
stats["by_type"] = typeCounts
stats["by_service"] = serviceCounts
stats["by_tool"] = toolCounts
w.Header().Set("Content-Type", "application/json")
json.NewEncoder(w).Encode(stats)
}
func (ws *WebService) handleServiceStats(w http.ResponseWriter, r *http.Request) {
service := r.PathValue("service")
if service == "" {
http.Error(w, "Service parameter is missing", http.StatusBadRequest)
return
}
timeRangeStr := r.URL.Query().Get("time_range")
var startTime time.Time
if timeRangeStr == "" {
startTime = time.Now().Add(-24 * time.Hour)
} else {
duration, err := time.ParseDuration(timeRangeStr)
if err != nil {
http.Error(w, fmt.Sprintf("Invalid time_range: %v", err), http.StatusBadRequest)
return
}
startTime = time.Now().Add(-duration)
}
query := StorageQuery{
Service: service,
StartTime: startTime,
Limit: 0,
OrderDesc: false,
}
ctx, cancel := context.WithTimeout(r.Context(), 30*time.Second)
defer cancel()
entries, err := ws.storage.Query(ctx, query)
if err != nil {
slog.Error("Failed to query service stats", "service", service, "error", err)
http.Error(w, fmt.Sprintf("Query error: %v", err), http.StatusInternalServerError)
return
}
uniqueTransfersTotal := make(map[string]struct{})
uniqueTransfersIncoming := make(map[string]struct{})
uniqueTransfersOutgoing := make(map[string]struct{})
uniqueTransfersNil := make(map[string]struct{})
for _, entry := range entries {
var identifier string
var direction string
if entry.Fields != nil {
if id, ok := entry.Fields["transfer_id"].(string); ok {
identifier = id
} else if id, ok := entry.Fields["correlation_id"].(string); ok {
identifier = id
}
if dir, ok := entry.Fields["direction"].(string); ok {
direction = dir
} else if rawName, ok := entry.Fields["transfer_name_raw"].(string); ok {
if strings.Contains(rawName, "-in") {
direction = "incoming"
} else if strings.Contains(rawName, "-out") {
direction = "outgoing"
}
}
if direction == "" && entry.Service == "tixstream" {
if strings.HasPrefix(entry.Raw, "in:") {
direction = "incoming"
} else if strings.HasPrefix(entry.Raw, "out:") {
direction = "outgoing"
}
}
}
if identifier == "" && entry.ServiceInformation != nil {
switch v := entry.ServiceInformation.(type) {
case models.TSTransferInfo:
identifier = v.TransferID
direction = v.Direction
case *models.TSTransferInfo:
identifier = v.TransferID
direction = v.Direction
case models.TJMTransferInfo:
identifier = v.TransferID
direction = v.Direction
case *models.TJMTransferInfo:
identifier = v.TransferID
direction = v.Direction
case map[string]any:
identifier, _ = v["transfer_identifier"].(string)
direction, _ = v["direction"].(string)
}
}
if identifier != "" && identifier != "no_transfer_id" {
uniqueTransfersTotal[identifier] = struct{}{}
dirLower := strings.ToLower(direction)
if strings.Contains(dirLower, "outgoing") || strings.Contains(dirLower, "out") {
uniqueTransfersOutgoing[identifier] = struct{}{}
} else if strings.Contains(dirLower, "incoming") || strings.Contains(dirLower, "in") {
uniqueTransfersIncoming[identifier] = struct{}{}
} else {
uniqueTransfersNil[identifier] = struct{}{}
}
}
}
stats := map[string]any{
"service": service,
"start_time": startTime,
"end_time": time.Now(),
"transfer_counts": map[string]any{
"total": len(uniqueTransfersTotal),
"incoming": len(uniqueTransfersIncoming),
"outgoing": len(uniqueTransfersOutgoing),
"nil_or_unknown_direction": len(uniqueTransfersNil),
},
"entry_count": len(entries),
}
w.Header().Set("Content-Type", "application/json")
json.NewEncoder(w).Encode(stats)
}
func (ws *WebService) parseLogsQuery(r *http.Request) StorageQuery {
query := StorageQuery{
Limit: 100,
OrderBy: "timestamp",
OrderDesc: true,
}
if limitStr := r.URL.Query().Get("limit"); limitStr != "" {
if limit, err := strconv.Atoi(limitStr); err == nil && limit > 0 {
if limit > 10000 {
limit = 10000
}
query.Limit = limit
}
}
if offsetStr := r.URL.Query().Get("offset"); offsetStr != "" {
if offset, err := strconv.Atoi(offsetStr); err == nil && offset >= 0 {
query.Offset = offset
}
}
if startTime := r.URL.Query().Get("start_time"); startTime != "" {
if t, err := time.Parse(time.RFC3339, startTime); err == nil {
query.StartTime = t
}
}
if endTime := r.URL.Query().Get("end_time"); endTime != "" {
if t, err := time.Parse(time.RFC3339, endTime); err == nil {
query.EndTime = t
}
}
if service := r.URL.Query().Get("service"); service != "" {
query.Service = strings.TrimSpace(service)
}
if tool := r.URL.Query().Get("tool"); tool != "" {
query.Tool = strings.TrimSpace(tool)
}
if logLevel := r.URL.Query().Get("log_level"); logLevel != "" {
query.LogLevel = strings.TrimSpace(logLevel)
}
if entryType := r.URL.Query().Get("type"); entryType != "" {
query.Type = strings.TrimSpace(entryType)
}
if orderBy := r.URL.Query().Get("order_by"); orderBy != "" {
validFields := []string{"timestamp", "service", "tool", "type", "log_level"}
if slices.Contains(validFields, orderBy) {
query.OrderBy = orderBy
}
}
if orderDesc := r.URL.Query().Get("order_desc"); orderDesc != "" {
query.OrderDesc = orderDesc == "true"
}
return query
}