go-elasticsearch 快速上手


由于官方文档还停留在 v7 版本,特此书写 v8 版本示例,以便使用

安装

go get github.com/elastic/go-elasticsearch/v8

配置解释

type Elasticsearch struct {
	Addresses []string // 使用的Elasticsearch节点列表。
	Username  string   // 用于HTTP基本认证的用户名。
	Password  string   // 用于HTTP基本认证的密码。
	CloudID   string   // Elastic服务的端点(https://elastic.co/cloud)。
	APIKey    string   // 授权用的Base64编码令牌;如果设置,将覆盖用户名/密码和服务令牌。
	ServiceToken string // 授权用的服务令牌;如果设置,将覆盖用户名/密码。
	CertificateFingerprint string // Elasticsearch首次启动时给出的SHA256十六进制指纹。
	Header http.Header // 全局HTTP请求头。

	// PEM编码的证书颁发机构。
	// 设置时,将创建一个空的证书池,并将证书附加到其中。
	// 该选项仅在未指定传输或使用http.Transport时有效。
	CACert []byte

	RetryOnStatus []int // 重试的状态码列表。默认值:502, 503, 504。
	DisableRetry  bool  // 是否禁用重试。默认值:false。
	MaxRetries    int   // 最大重试次数。默认值:3。
	RetryOnError  func(*http.Request, error) bool // 可选函数,指示哪些错误应重试。默认值:nil。
	CompressRequestBody bool // 是否压缩请求体。默认值:false。
	CompressRequestBodyLevel int // 请求体压缩级别。默认值:gzip.DefaultCompression。
	DiscoverNodesOnStart  bool          // 初始化客户端时是否发现节点。默认值:false。
	DiscoverNodesInterval time.Duration // 定期发现节点的间隔时间。默认值:禁用。

	EnableMetrics           bool // 是否启用指标收集。
	EnableDebugLogger       bool // 是否启用调试日志。
	EnableCompatibilityMode bool // 是否发送兼容性头。

	DisableMetaHeader bool // 是否禁用额外的“X-Elastic-Client-Meta” HTTP头。

	RetryBackoff func(attempt int) time.Duration // 可选的退避时长。默认值:nil。

	Transport http.RoundTripper         // HTTP传输对象。
	Logger    elastictransport.Logger   // 日志记录对象。
	Selector  elastictransport.Selector // 选择器对象。

	// 自定义ConnectionPool的可选构造函数。默认值:nil。
	ConnectionPoolFunc func([]*elastictransport.Connection, elastictransport.Selector) elastictransport.ConnectionPool

	Instrumentation elastictransport.Instrumentation // 在客户端中启用仪表化。
}

封装

本处对 ES 进行初步封装,使其更符合本人用法,大家看着用ψ(`∇´)ψ

es.go

package elasticsearch

import (
	"bytes"
	"context"
	"encoding/json"
	"fmt"
	"github.com/elastic/go-elasticsearch/v8"
	"github.com/pkg/errors"
	"io"
)

type ES struct {
	client *elasticsearch.Client
}

// NewES cfg 的数据量较大,因此选择使用传地址
func NewES(cfg *elasticsearch.Config) (*ES, error) {
	client, err := elasticsearch.NewClient(*cfg)
	if err != nil {
		return nil, err
	}

	es := &ES{
		client: client,
	}
	return es, nil
}

// ResponseParse 响应解析函数
type ResponseParse func(body io.ReadCloser) (map[string]any, error)

// Search 搜索 API 的请求方法
func (es ES) Search(ctx context.Context, index string, body any, parser ResponseParse) (map[string]any, error) {
	// 获取 ES 实例
	buf := new(bytes.Buffer)
	if err := json.NewEncoder(buf).Encode(body); err != nil {
		return nil, errors.Wrap(err, "Error encoding query")
	}
	data, err := es.client.Search(
		es.client.Search.WithContext(ctx),
		es.client.Search.WithIndex(index),
		es.client.Search.WithBody(buf),
		es.client.Search.WithPretty(),
	)
	if err != nil {
		return nil, errors.Wrap(err, "查询ES失败")
	}
	defer data.Body.Close()

	if data.IsError() {
		return nil, decodeErrorResponse(data.Body, data.Status())
	}
	return parser(data.Body)
}

// decodeErrorResponse 解析错误响应
func decodeErrorResponse(body io.ReadCloser, status string) error {
	var e map[string]interface{}
	if err := json.NewDecoder(body).Decode(&e); err != nil {
		return errors.Wrap(err, "解析错误响应失败")
	}
	return errors.New(fmt.Sprintf("[%s] %s: %s", status, e["error"].(map[string]interface{})["type"], e["error"].(map[string]interface{})["reason"]))
}

default.go

package elasticsearch

import (
	"SnapLink/internal/config"
	"context"
	"github.com/elastic/go-elasticsearch/v8"
	"github.com/pkg/errors"
	"github.com/zhufuyi/sponge/pkg/logger"
	"sync"
)

var instance struct {
	es   *ES
	once sync.Once
}

func esInstance() *ES {
	instance.once.Do(
		func() {
			esConfig := config.Get().Elasticsearch
			cfg := elasticsearch.Config{
				Addresses:                esConfig.Addresses,
				Username:                 esConfig.Username,
				Password:                 esConfig.Password,
				CloudID:                  esConfig.CloudID,
				APIKey:                   esConfig.APIKey,
				ServiceToken:             esConfig.ServiceToken,
				CertificateFingerprint:   esConfig.CertificateFingerprint,
				Header:                   nil,
				CACert:                   nil,
				RetryOnStatus:            esConfig.RetryOnStatus,
				DisableRetry:             esConfig.DisableRetry,
				MaxRetries:               esConfig.MaxRetries,
				RetryOnError:             nil,
				CompressRequestBody:      esConfig.CompressRequestBody,
				CompressRequestBodyLevel: esConfig.CompressRequestBodyLevel,
				DiscoverNodesOnStart:     esConfig.DiscoverNodesOnStart,
				DiscoverNodesInterval:    esConfig.DiscoverNodesInterval,
				EnableMetrics:            esConfig.EnableMetrics,
				EnableDebugLogger:        esConfig.EnableDebugLogger,
				EnableCompatibilityMode:  esConfig.EnableCompatibilityMode,
				DisableMetaHeader:        esConfig.DisableMetaHeader,
				RetryBackoff:             nil,
				Transport:                nil,
				Logger:                   nil,
				Selector:                 nil,
				ConnectionPoolFunc:       nil,
				Instrumentation:          nil,
			}
			var err error
			instance.es, err = NewES(&cfg)
			if err != nil {
				logger.Panic(errors.Wrap(err, "init default es failed").Error())
			}
		})
	return instance.es
}

// Search ES 搜索 API
func Search(ctx context.Context, index string, body any, parser ResponseParse) (map[string]any, error) {
	return esInstance().Search(ctx, index, body, parser)
}

用法示例

本处截取自我的开源项目 Snaplink


// 去 ES 中查询访问情况
func searchTodayAccessStatic(ctx context.Context, uris []string) (map[string]any, error) {
	// 获取 ES 实例
	body := map[string]any{
		"size":    0,
		"_source": false,
		"query": map[string]any{
			"terms": map[string]any{
				"info.uri.keyword": uris,
			},
		},
		"aggs": map[string]any{
			"statics": map[string]any{
				"terms": map[string]any{
					"field": "info.uri.keyword",
				},
				"aggs": map[string]any{
					"today_pv": map[string]any{
						"value_count": map[string]any{
							"field": "info.uri.keyword",
						},
					},
					"today_uip": map[string]any{
						"cardinality": map[string]any{
							"field": "ip.keyword",
						},
					},
					"today_uv": map[string]any{
						"cardinality": map[string]any{
							"field": "uid.keyword",
						},
					},
				},
			},
		},
	}
	index := fmt.Sprintf("logstash-accesslog-%s.*", time.Now().Format("2006.01.02"))
	return elasticsearch.Search(ctx, index, body, accessStaticResponseParser)
}

// accessStaticResponseParser 访问日志查询响应
func accessStaticResponseParser(body io.ReadCloser) (map[string]any, error) {
	var r map[string]interface{}
	if err := json.NewDecoder(body).Decode(&r); err != nil {
		return nil, errors.Wrap(err, "解析响应失败")
	}

	aggregations, ok := r["aggregations"].(map[string]any)
	if !ok {
		return nil, ErrBucketsIsEmpty
	}

	buckets := aggregations["statics"].(map[string]any)["buckets"].([]any)
	if len(buckets) == 0 {
		return nil, ErrBucketsIsEmpty
	}

	statics := make(map[string]any, len(buckets))
	for _, bucket := range buckets {
		key := bucket.(map[string]any)["key"].(string)
		statics[key] = bucket
	}

	return statics, nil
}

未完待续


如果本文帮助到了你,帮我点个广告可以咩(o′┏▽┓`o)


评论
  目录