feat: implement new generic parser and improve production readyness
This commit is contained in:
parent
8364218234
commit
0830b403e0
34 changed files with 1715 additions and 2114 deletions
|
|
@ -4,280 +4,87 @@ import (
|
|||
"context"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"io"
|
||||
"log/slog"
|
||||
"strings"
|
||||
"time"
|
||||
"watch-tool/models"
|
||||
|
||||
"github.com/elastic/go-elasticsearch/v8"
|
||||
)
|
||||
|
||||
type ElasticsearchExporter struct {
|
||||
client *elasticsearch.Client
|
||||
config ElasticsearchConfig
|
||||
}
|
||||
|
||||
func NewElasticsearchExporter(client *elasticsearch.Client) *ElasticsearchExporter {
|
||||
func NewElasticsearchExporter(config ElasticsearchConfig) (*ElasticsearchExporter, error) {
|
||||
client, err := NewElasticsearchClient(config)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to create Elasticsearch client: %w", err)
|
||||
}
|
||||
|
||||
return &ElasticsearchExporter{
|
||||
client: client,
|
||||
}
|
||||
config: config,
|
||||
}, nil
|
||||
}
|
||||
|
||||
type ExportResult struct {
|
||||
Index string `json:"index"`
|
||||
DocumentCount int `json:"document_count"`
|
||||
StartTime time.Time `json:"start_time"`
|
||||
EndTime time.Time `json:"end_time"`
|
||||
Duration string `json:"duration"`
|
||||
Error string `json:"error,omitempty"`
|
||||
}
|
||||
|
||||
func (e *ElasticsearchExporter) ExportToStream(ctx context.Context, indices []string, batchSize int, since int, writer io.Writer) error {
|
||||
startTime := time.Now()
|
||||
|
||||
if _, err := writer.Write([]byte("{\n \"export_info\": {\n")); err != nil {
|
||||
return fmt.Errorf("error writing export header: %w", err)
|
||||
func (e *ElasticsearchExporter) Export(ctx context.Context, entries []models.LogMessage) error {
|
||||
if len(entries) == 0 {
|
||||
return nil
|
||||
}
|
||||
|
||||
exportInfo := map[string]any{
|
||||
"timestamp": startTime,
|
||||
"indices": indices,
|
||||
"batch_size": batchSize,
|
||||
"sinceDays": since,
|
||||
}
|
||||
var body strings.Builder
|
||||
for _, entry := range entries {
|
||||
indexName := e.config.Index
|
||||
|
||||
infoBytes, err := json.MarshalIndent(exportInfo, " ", " ")
|
||||
if err != nil {
|
||||
return fmt.Errorf("error marshalling export info: %w", err)
|
||||
}
|
||||
indexLine := fmt.Sprintf(`{"index":{"_index":"%s"}}`, indexName)
|
||||
body.WriteString(indexLine)
|
||||
body.WriteString("\n")
|
||||
|
||||
infoStr := string(infoBytes)
|
||||
infoStr = strings.TrimPrefix(infoStr, "{")
|
||||
infoStr = strings.TrimSuffix(infoStr, "}")
|
||||
|
||||
if _, err := writer.Write([]byte(infoStr)); err != nil {
|
||||
return fmt.Errorf("error writing export info: %w", err)
|
||||
}
|
||||
|
||||
if _, err := writer.Write([]byte("\n },\n \"data\": {\n")); err != nil {
|
||||
return fmt.Errorf("error writing data header: %w", err)
|
||||
}
|
||||
|
||||
results := make([]ExportResult, 0, len(indices))
|
||||
first := true
|
||||
|
||||
for _, index := range indices {
|
||||
if !first {
|
||||
if _, err := writer.Write([]byte(",\n")); err != nil {
|
||||
return fmt.Errorf("error writing separator: %w", err)
|
||||
}
|
||||
}
|
||||
first = false
|
||||
|
||||
result := e.exportIndex(ctx, index, batchSize, since, writer)
|
||||
results = append(results, result)
|
||||
|
||||
if result.Error != "" {
|
||||
slog.Error("error exporting index", "index", index, "error", result.Error)
|
||||
data, err := json.Marshal(entry)
|
||||
if err != nil {
|
||||
slog.Error("error marshalling JSON", "error", err)
|
||||
continue
|
||||
}
|
||||
body.WriteString(string(data))
|
||||
body.WriteString("\n")
|
||||
}
|
||||
|
||||
if _, err := writer.Write([]byte("\n },\n \"results\": ")); err != nil {
|
||||
return fmt.Errorf("error writing results header: %w", err)
|
||||
}
|
||||
timeout := time.Duration(e.config.Timeout) * time.Second
|
||||
ctx, cancel := context.WithTimeout(ctx, timeout)
|
||||
defer cancel()
|
||||
|
||||
if err := json.NewEncoder(writer).Encode(results); err != nil {
|
||||
return fmt.Errorf("error writing results: %w", err)
|
||||
}
|
||||
|
||||
if _, err := writer.Write([]byte("}\n")); err != nil {
|
||||
return fmt.Errorf("error writing final bracket: %w", err)
|
||||
}
|
||||
|
||||
duration := time.Since(startTime)
|
||||
slog.Info("Export completed", "duration", duration, "indices_count", len(indices))
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (e *ElasticsearchExporter) exportIndex(ctx context.Context, index string, batchSize int, since int, writer io.Writer) ExportResult {
|
||||
startTime := time.Now()
|
||||
result := ExportResult{
|
||||
Index: index,
|
||||
StartTime: startTime,
|
||||
}
|
||||
|
||||
if _, err := fmt.Fprintf(writer, " \"%s\": [\n", index); err != nil {
|
||||
result.Error = fmt.Sprintf("error writing index header: %v", err)
|
||||
result.EndTime = time.Now()
|
||||
result.Duration = time.Since(startTime).String()
|
||||
return result
|
||||
}
|
||||
|
||||
query := `{"query":{"match_all":{}}}`
|
||||
if since > 0 {
|
||||
query = fmt.Sprintf(`{
|
||||
"query": {
|
||||
"range": {
|
||||
"timestamp": {
|
||||
"gte": "now-%dd/d",
|
||||
"lt": "now/d"
|
||||
}
|
||||
}
|
||||
}
|
||||
}`, since)
|
||||
}
|
||||
res, err := e.client.Search(
|
||||
e.client.Search.WithContext(ctx),
|
||||
e.client.Search.WithIndex(index),
|
||||
e.client.Search.WithScroll(1000),
|
||||
e.client.Search.WithSize(batchSize),
|
||||
e.client.Search.WithBody(strings.NewReader(query)),
|
||||
res, err := e.client.Bulk(
|
||||
strings.NewReader(body.String()),
|
||||
e.client.Bulk.WithContext(ctx),
|
||||
)
|
||||
if err != nil {
|
||||
result.Error = fmt.Sprintf("error in initial search: %v", err)
|
||||
result.EndTime = time.Now()
|
||||
result.Duration = time.Since(startTime).String()
|
||||
return result
|
||||
return fmt.Errorf("bulk request error: %w", err)
|
||||
}
|
||||
defer res.Body.Close()
|
||||
|
||||
if res.IsError() {
|
||||
result.Error = fmt.Sprintf("elasticsearch error: %s", res.String())
|
||||
result.EndTime = time.Now()
|
||||
result.Duration = time.Since(startTime).String()
|
||||
return result
|
||||
return fmt.Errorf("bulk request failed: %s", res.String())
|
||||
}
|
||||
|
||||
var searchResult map[string]any
|
||||
if err := json.NewDecoder(res.Body).Decode(&searchResult); err != nil {
|
||||
result.Error = fmt.Sprintf("error decoding search result: %v", err)
|
||||
result.EndTime = time.Now()
|
||||
result.Duration = time.Since(startTime).String()
|
||||
return result
|
||||
}
|
||||
|
||||
scrollID, ok := searchResult["_scroll_id"].(string)
|
||||
if !ok {
|
||||
result.Error = "no scroll_id found in search result"
|
||||
result.EndTime = time.Now()
|
||||
result.Duration = time.Since(startTime).String()
|
||||
return result
|
||||
}
|
||||
|
||||
hits := searchResult["hits"].(map[string]any)["hits"].([]any)
|
||||
firstDocument := true
|
||||
documentCount := 0
|
||||
|
||||
for _, hit := range hits {
|
||||
if !firstDocument {
|
||||
if _, err := writer.Write([]byte(",\n")); err != nil {
|
||||
result.Error = fmt.Sprintf("error writing separator: %v", err)
|
||||
result.EndTime = time.Now()
|
||||
result.Duration = time.Since(startTime).String()
|
||||
return result
|
||||
}
|
||||
}
|
||||
firstDocument = false
|
||||
|
||||
source := hit.(map[string]any)["_source"]
|
||||
if err := e.writeDocument(writer, source); err != nil {
|
||||
result.Error = fmt.Sprintf("error writing document: %v", err)
|
||||
result.EndTime = time.Now()
|
||||
result.Duration = time.Since(startTime).String()
|
||||
return result
|
||||
}
|
||||
documentCount++
|
||||
}
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
result.Error = "context cancelled"
|
||||
result.EndTime = time.Now()
|
||||
result.Duration = time.Since(startTime).String()
|
||||
return result
|
||||
default:
|
||||
}
|
||||
|
||||
scrollRes, err := e.client.Scroll(
|
||||
e.client.Scroll.WithScrollID(scrollID),
|
||||
e.client.Scroll.WithScroll(1000),
|
||||
e.client.Scroll.WithContext(ctx),
|
||||
)
|
||||
if err != nil {
|
||||
result.Error = fmt.Sprintf("error in scroll request: %v", err)
|
||||
break
|
||||
}
|
||||
defer scrollRes.Body.Close()
|
||||
|
||||
if scrollRes.IsError() {
|
||||
result.Error = fmt.Sprintf("elasticsearch scroll error: %s", scrollRes.String())
|
||||
break
|
||||
}
|
||||
|
||||
var scrollResult map[string]any
|
||||
if err := json.NewDecoder(scrollRes.Body).Decode(&scrollResult); err != nil {
|
||||
result.Error = fmt.Sprintf("error decoding scroll result: %v", err)
|
||||
break
|
||||
}
|
||||
|
||||
hits := scrollResult["hits"].(map[string]any)["hits"].([]any)
|
||||
if len(hits) == 0 {
|
||||
break
|
||||
}
|
||||
|
||||
scrollID, _ = scrollResult["_scroll_id"].(string)
|
||||
|
||||
for _, hit := range hits {
|
||||
if _, err := writer.Write([]byte(",\n")); err != nil {
|
||||
result.Error = fmt.Sprintf("error writing separator: %v", err)
|
||||
result.EndTime = time.Now()
|
||||
result.Duration = time.Since(startTime).String()
|
||||
return result
|
||||
}
|
||||
|
||||
source := hit.(map[string]any)["_source"]
|
||||
if err := e.writeDocument(writer, source); err != nil {
|
||||
result.Error = fmt.Sprintf("error writing document: %v", err)
|
||||
result.EndTime = time.Now()
|
||||
result.Duration = time.Since(startTime).String()
|
||||
return result
|
||||
}
|
||||
documentCount++
|
||||
}
|
||||
}
|
||||
|
||||
if _, err := writer.Write([]byte("\n ]")); err != nil {
|
||||
if result.Error == "" {
|
||||
result.Error = fmt.Sprintf("error writing index footer: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
result.DocumentCount = documentCount
|
||||
result.EndTime = time.Now()
|
||||
result.Duration = time.Since(startTime).String()
|
||||
|
||||
slog.Info("Index export completed",
|
||||
"index", index,
|
||||
"documents", documentCount,
|
||||
"duration", result.Duration,
|
||||
)
|
||||
|
||||
return result
|
||||
slog.Debug("Batch successfully exported to Elasticsearch", "count", len(entries))
|
||||
return nil
|
||||
}
|
||||
|
||||
func (e *ElasticsearchExporter) writeDocument(writer io.Writer, document any) error {
|
||||
jsonBytes, err := json.MarshalIndent(document, " ", " ")
|
||||
func (e *ElasticsearchExporter) HealthCheck(ctx context.Context) error {
|
||||
timeout := time.Duration(e.config.Timeout) * time.Second
|
||||
ctx, cancel := context.WithTimeout(ctx, timeout)
|
||||
defer cancel()
|
||||
|
||||
res, err := e.client.Info(e.client.Info.WithContext(ctx))
|
||||
if err != nil {
|
||||
return fmt.Errorf("error marshalling document: %w", err)
|
||||
return fmt.Errorf("health check failed: %w", err)
|
||||
}
|
||||
defer res.Body.Close()
|
||||
|
||||
if _, err := writer.Write([]byte(" ")); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if _, err := writer.Write(jsonBytes); err != nil {
|
||||
return err
|
||||
if res.IsError() {
|
||||
return fmt.Errorf("health check failed: %s", res.String())
|
||||
}
|
||||
|
||||
return nil
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue