202 lines
4.7 KiB
Go
202 lines
4.7 KiB
Go
package main
|
|
|
|
import (
|
|
"context"
|
|
"encoding/json"
|
|
"fmt"
|
|
"log/slog"
|
|
"net/http"
|
|
"strconv"
|
|
"strings"
|
|
"time"
|
|
|
|
"github.com/elastic/go-elasticsearch/v7"
|
|
)
|
|
|
|
type WebService struct {
|
|
server *http.Server
|
|
esClient *elasticsearch.Client
|
|
config *Config
|
|
}
|
|
|
|
func NewWebService(config *Config, esClient *elasticsearch.Client) *WebService {
|
|
mux := http.NewServeMux()
|
|
|
|
ws := &WebService{
|
|
esClient: esClient,
|
|
config: config,
|
|
}
|
|
|
|
mux.HandleFunc("/export", ws.handleExport)
|
|
mux.HandleFunc("/health", ws.handleHealth)
|
|
mux.HandleFunc("/indices", ws.handleIndices)
|
|
|
|
addr := fmt.Sprintf("%s:%d", config.WebService.Host, config.WebService.Port)
|
|
ws.server = &http.Server{
|
|
Addr: addr,
|
|
Handler: mux,
|
|
ReadTimeout: 30 * time.Second,
|
|
WriteTimeout: 300 * time.Second,
|
|
IdleTimeout: 60 * time.Second,
|
|
}
|
|
|
|
return ws
|
|
}
|
|
|
|
func (ws *WebService) Start(ctx context.Context) error {
|
|
go func() {
|
|
<-ctx.Done()
|
|
shutdownCtx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
|
|
defer cancel()
|
|
|
|
if err := ws.server.Shutdown(shutdownCtx); err != nil {
|
|
slog.Error("web service shutdown error", "error", err)
|
|
}
|
|
}()
|
|
|
|
slog.Info("Starting web service", "address", ws.server.Addr)
|
|
|
|
if err := ws.server.ListenAndServe(); err != http.ErrServerClosed {
|
|
return fmt.Errorf("web service error: %w", err)
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (ws *WebService) handleExport(w http.ResponseWriter, r *http.Request) {
|
|
if r.Method != http.MethodGet {
|
|
http.Error(w, "Method not allowed", http.StatusMethodNotAllowed)
|
|
return
|
|
}
|
|
|
|
indices := ws.parseIndicesParam(r)
|
|
if len(indices) == 0 {
|
|
http.Error(w, "No indices specified. Use ?indices=index1,index2", http.StatusBadRequest)
|
|
return
|
|
}
|
|
|
|
size := ws.parseSizeParam(r)
|
|
|
|
slog.Info("Export request received", "indices", indices, "size", size)
|
|
|
|
w.Header().Set("Content-Type", "application/json")
|
|
w.Header().Set("Content-Disposition", "attachment; filename=elasticsearch_export.json")
|
|
|
|
exporter := NewElasticsearchExporter(ws.esClient)
|
|
if err := exporter.ExportToStream(r.Context(), indices, size, w); err != nil {
|
|
slog.Error("export error", "error", err)
|
|
http.Error(w, fmt.Sprintf("Export error: %v", err), http.StatusInternalServerError)
|
|
return
|
|
}
|
|
|
|
slog.Info("Export completed successfully", "indices", indices)
|
|
}
|
|
|
|
func (ws *WebService) handleHealth(w http.ResponseWriter, r *http.Request) {
|
|
if r.Method != http.MethodGet {
|
|
http.Error(w, "Method not allowed", http.StatusMethodNotAllowed)
|
|
return
|
|
}
|
|
|
|
ctx, cancel := context.WithTimeout(r.Context(), 5*time.Second)
|
|
defer cancel()
|
|
|
|
res, err := ws.esClient.Info(ws.esClient.Info.WithContext(ctx))
|
|
if err != nil {
|
|
w.WriteHeader(http.StatusServiceUnavailable)
|
|
json.NewEncoder(w).Encode(map[string]any{
|
|
"status": "unhealthy",
|
|
"error": err.Error(),
|
|
})
|
|
return
|
|
}
|
|
res.Body.Close()
|
|
|
|
if res.IsError() {
|
|
w.WriteHeader(http.StatusServiceUnavailable)
|
|
json.NewEncoder(w).Encode(map[string]any{
|
|
"status": "unhealthy",
|
|
"error": res.String(),
|
|
})
|
|
return
|
|
}
|
|
|
|
w.Header().Set("Content-Type", "application/json")
|
|
json.NewEncoder(w).Encode(map[string]any{
|
|
"status": "healthy",
|
|
"timestamp": time.Now(),
|
|
})
|
|
}
|
|
|
|
func (ws *WebService) handleIndices(w http.ResponseWriter, r *http.Request) {
|
|
if r.Method != http.MethodGet {
|
|
http.Error(w, "Method not allowed", http.StatusMethodNotAllowed)
|
|
return
|
|
}
|
|
|
|
ctx, cancel := context.WithTimeout(r.Context(), 10*time.Second)
|
|
defer cancel()
|
|
|
|
res, err := ws.esClient.Cat.Indices(
|
|
ws.esClient.Cat.Indices.WithContext(ctx),
|
|
ws.esClient.Cat.Indices.WithFormat("json"),
|
|
)
|
|
if err != nil {
|
|
http.Error(w, fmt.Sprintf("Error fetching indices: %v", err), http.StatusInternalServerError)
|
|
return
|
|
}
|
|
defer res.Body.Close()
|
|
|
|
if res.IsError() {
|
|
http.Error(w, fmt.Sprintf("Elasticsearch error: %s", res.String()), http.StatusInternalServerError)
|
|
return
|
|
}
|
|
|
|
var indices []map[string]any
|
|
if err := json.NewDecoder(res.Body).Decode(&indices); err != nil {
|
|
http.Error(w, fmt.Sprintf("Error decoding response: %v", err), http.StatusInternalServerError)
|
|
return
|
|
}
|
|
|
|
w.Header().Set("Content-Type", "application/json")
|
|
json.NewEncoder(w).Encode(map[string]any{
|
|
"indices": indices,
|
|
"count": len(indices),
|
|
})
|
|
}
|
|
|
|
func (ws *WebService) parseIndicesParam(r *http.Request) []string {
|
|
indicesParam := r.URL.Query().Get("indices")
|
|
if indicesParam == "" {
|
|
return nil
|
|
}
|
|
|
|
indices := strings.Split(indicesParam, ",")
|
|
var result []string
|
|
for _, index := range indices {
|
|
index = strings.TrimSpace(index)
|
|
if index != "" {
|
|
result = append(result, index)
|
|
}
|
|
}
|
|
|
|
return result
|
|
}
|
|
|
|
func (ws *WebService) parseSizeParam(r *http.Request) int {
|
|
sizeParam := r.URL.Query().Get("size")
|
|
if sizeParam == "" {
|
|
return 1000
|
|
}
|
|
|
|
size, err := strconv.Atoi(sizeParam)
|
|
if err != nil || size <= 0 {
|
|
return 1000
|
|
}
|
|
|
|
if size > 10000 {
|
|
size = 10000
|
|
}
|
|
|
|
return size
|
|
}
|