watch-tool/elasticsearch_exporter.go

272 lines
7.1 KiB
Go

package main
import (
"context"
"encoding/json"
"fmt"
"io"
"log/slog"
"strings"
"time"
"github.com/elastic/go-elasticsearch/v7"
)
type ElasticsearchExporter struct {
client *elasticsearch.Client
}
func NewElasticsearchExporter(client *elasticsearch.Client) *ElasticsearchExporter {
return &ElasticsearchExporter{
client: client,
}
}
type ExportResult struct {
Index string `json:"index"`
DocumentCount int `json:"document_count"`
StartTime time.Time `json:"start_time"`
EndTime time.Time `json:"end_time"`
Duration string `json:"duration"`
Error string `json:"error,omitempty"`
}
func (e *ElasticsearchExporter) ExportToStream(ctx context.Context, indices []string, batchSize int, writer io.Writer) error {
startTime := time.Now()
if _, err := writer.Write([]byte("{\n \"export_info\": {\n")); err != nil {
return fmt.Errorf("error writing export header: %w", err)
}
exportInfo := map[string]any{
"timestamp": startTime,
"indices": indices,
"batch_size": batchSize,
}
infoBytes, err := json.MarshalIndent(exportInfo, " ", " ")
if err != nil {
return fmt.Errorf("error marshalling export info: %w", err)
}
infoStr := string(infoBytes)
infoStr = strings.TrimPrefix(infoStr, "{")
infoStr = strings.TrimSuffix(infoStr, "}")
if _, err := writer.Write([]byte(infoStr)); err != nil {
return fmt.Errorf("error writing export info: %w", err)
}
if _, err := writer.Write([]byte("\n },\n \"data\": {\n")); err != nil {
return fmt.Errorf("error writing data header: %w", err)
}
results := make([]ExportResult, 0, len(indices))
first := true
for _, index := range indices {
if !first {
if _, err := writer.Write([]byte(",\n")); err != nil {
return fmt.Errorf("error writing separator: %w", err)
}
}
first = false
result := e.exportIndex(ctx, index, batchSize, writer)
results = append(results, result)
if result.Error != "" {
slog.Error("error exporting index", "index", index, "error", result.Error)
}
}
if _, err := writer.Write([]byte("\n },\n \"results\": ")); err != nil {
return fmt.Errorf("error writing results header: %w", err)
}
if err := json.NewEncoder(writer).Encode(results); err != nil {
return fmt.Errorf("error writing results: %w", err)
}
if _, err := writer.Write([]byte("}\n")); err != nil {
return fmt.Errorf("error writing final bracket: %w", err)
}
duration := time.Since(startTime)
slog.Info("Export completed", "duration", duration, "indices_count", len(indices))
return nil
}
func (e *ElasticsearchExporter) exportIndex(ctx context.Context, index string, batchSize int, writer io.Writer) ExportResult {
startTime := time.Now()
result := ExportResult{
Index: index,
StartTime: startTime,
}
if _, err := fmt.Fprintf(writer, " \"%s\": [\n", index); err != nil {
result.Error = fmt.Sprintf("error writing index header: %v", err)
result.EndTime = time.Now()
result.Duration = time.Since(startTime).String()
return result
}
query := `{"query":{"match_all":{}}}`
res, err := e.client.Search(
e.client.Search.WithContext(ctx),
e.client.Search.WithIndex(index),
e.client.Search.WithScroll(1000),
e.client.Search.WithSize(batchSize),
e.client.Search.WithBody(strings.NewReader(query)),
)
if err != nil {
result.Error = fmt.Sprintf("error in initial search: %v", err)
result.EndTime = time.Now()
result.Duration = time.Since(startTime).String()
return result
}
defer res.Body.Close()
if res.IsError() {
result.Error = fmt.Sprintf("elasticsearch error: %s", res.String())
result.EndTime = time.Now()
result.Duration = time.Since(startTime).String()
return result
}
var searchResult map[string]any
if err := json.NewDecoder(res.Body).Decode(&searchResult); err != nil {
result.Error = fmt.Sprintf("error decoding search result: %v", err)
result.EndTime = time.Now()
result.Duration = time.Since(startTime).String()
return result
}
scrollID, ok := searchResult["_scroll_id"].(string)
if !ok {
result.Error = "no scroll_id found in search result"
result.EndTime = time.Now()
result.Duration = time.Since(startTime).String()
return result
}
hits := searchResult["hits"].(map[string]any)["hits"].([]any)
firstDocument := true
documentCount := 0
for _, hit := range hits {
if !firstDocument {
if _, err := writer.Write([]byte(",\n")); err != nil {
result.Error = fmt.Sprintf("error writing separator: %v", err)
result.EndTime = time.Now()
result.Duration = time.Since(startTime).String()
return result
}
}
firstDocument = false
source := hit.(map[string]any)["_source"]
if err := e.writeDocument(writer, source); err != nil {
result.Error = fmt.Sprintf("error writing document: %v", err)
result.EndTime = time.Now()
result.Duration = time.Since(startTime).String()
return result
}
documentCount++
}
for {
select {
case <-ctx.Done():
result.Error = "context cancelled"
result.EndTime = time.Now()
result.Duration = time.Since(startTime).String()
return result
default:
}
scrollRes, err := e.client.Scroll(
e.client.Scroll.WithScrollID(scrollID),
e.client.Scroll.WithScroll(1000),
e.client.Scroll.WithContext(ctx),
)
if err != nil {
result.Error = fmt.Sprintf("error in scroll request: %v", err)
break
}
defer scrollRes.Body.Close()
if scrollRes.IsError() {
result.Error = fmt.Sprintf("elasticsearch scroll error: %s", scrollRes.String())
break
}
var scrollResult map[string]any
if err := json.NewDecoder(scrollRes.Body).Decode(&scrollResult); err != nil {
result.Error = fmt.Sprintf("error decoding scroll result: %v", err)
break
}
hits := scrollResult["hits"].(map[string]any)["hits"].([]any)
if len(hits) == 0 {
break
}
scrollID, _ = scrollResult["_scroll_id"].(string)
for _, hit := range hits {
if _, err := writer.Write([]byte(",\n")); err != nil {
result.Error = fmt.Sprintf("error writing separator: %v", err)
result.EndTime = time.Now()
result.Duration = time.Since(startTime).String()
return result
}
source := hit.(map[string]any)["_source"]
if err := e.writeDocument(writer, source); err != nil {
result.Error = fmt.Sprintf("error writing document: %v", err)
result.EndTime = time.Now()
result.Duration = time.Since(startTime).String()
return result
}
documentCount++
}
}
if _, err := writer.Write([]byte("\n ]")); err != nil {
if result.Error == "" {
result.Error = fmt.Sprintf("error writing index footer: %v", err)
}
}
result.DocumentCount = documentCount
result.EndTime = time.Now()
result.Duration = time.Since(startTime).String()
slog.Info("Index export completed",
"index", index,
"documents", documentCount,
"duration", result.Duration,
)
return result
}
func (e *ElasticsearchExporter) writeDocument(writer io.Writer, document any) error {
jsonBytes, err := json.MarshalIndent(document, " ", " ")
if err != nil {
return fmt.Errorf("error marshalling document: %w", err)
}
if _, err := writer.Write([]byte(" ")); err != nil {
return err
}
if _, err := writer.Write(jsonBytes); err != nil {
return err
}
return nil
}