由于官方文档还停留在 v7 版本,特此书写 v8 版本示例,以便使用
安装
go get github.com/elastic/go-elasticsearch/v8
配置解释
type Elasticsearch struct {
Addresses []string // 使用的Elasticsearch节点列表。
Username string // 用于HTTP基本认证的用户名。
Password string // 用于HTTP基本认证的密码。
CloudID string // Elastic服务的端点(https://elastic.co/cloud)。
APIKey string // 授权用的Base64编码令牌;如果设置,将覆盖用户名/密码和服务令牌。
ServiceToken string // 授权用的服务令牌;如果设置,将覆盖用户名/密码。
CertificateFingerprint string // Elasticsearch首次启动时给出的SHA256十六进制指纹。
Header http.Header // 全局HTTP请求头。
// PEM编码的证书颁发机构。
// 设置时,将创建一个空的证书池,并将证书附加到其中。
// 该选项仅在未指定传输或使用http.Transport时有效。
CACert []byte
RetryOnStatus []int // 重试的状态码列表。默认值:502, 503, 504。
DisableRetry bool // 是否禁用重试。默认值:false。
MaxRetries int // 最大重试次数。默认值:3。
RetryOnError func(*http.Request, error) bool // 可选函数,指示哪些错误应重试。默认值:nil。
CompressRequestBody bool // 是否压缩请求体。默认值:false。
CompressRequestBodyLevel int // 请求体压缩级别。默认值:gzip.DefaultCompression。
DiscoverNodesOnStart bool // 初始化客户端时是否发现节点。默认值:false。
DiscoverNodesInterval time.Duration // 定期发现节点的间隔时间。默认值:禁用。
EnableMetrics bool // 是否启用指标收集。
EnableDebugLogger bool // 是否启用调试日志。
EnableCompatibilityMode bool // 是否发送兼容性头。
DisableMetaHeader bool // 是否禁用额外的“X-Elastic-Client-Meta” HTTP头。
RetryBackoff func(attempt int) time.Duration // 可选的退避时长。默认值:nil。
Transport http.RoundTripper // HTTP传输对象。
Logger elastictransport.Logger // 日志记录对象。
Selector elastictransport.Selector // 选择器对象。
// 自定义ConnectionPool的可选构造函数。默认值:nil。
ConnectionPoolFunc func([]*elastictransport.Connection, elastictransport.Selector) elastictransport.ConnectionPool
Instrumentation elastictransport.Instrumentation // 在客户端中启用仪表化。
}
封装
本处对 ES 进行初步封装,使其更符合本人用法,大家看着用ψ(`∇´)ψ
es.go
package elasticsearch
import (
"bytes"
"context"
"encoding/json"
"fmt"
"github.com/elastic/go-elasticsearch/v8"
"github.com/pkg/errors"
"io"
)
type ES struct {
client *elasticsearch.Client
}
// NewES cfg 的数据量较大,因此选择使用传地址
func NewES(cfg *elasticsearch.Config) (*ES, error) {
client, err := elasticsearch.NewClient(*cfg)
if err != nil {
return nil, err
}
es := &ES{
client: client,
}
return es, nil
}
// ResponseParse 响应解析函数
type ResponseParse func(body io.ReadCloser) (map[string]any, error)
// Search 搜索 API 的请求方法
func (es ES) Search(ctx context.Context, index string, body any, parser ResponseParse) (map[string]any, error) {
// 获取 ES 实例
buf := new(bytes.Buffer)
if err := json.NewEncoder(buf).Encode(body); err != nil {
return nil, errors.Wrap(err, "Error encoding query")
}
data, err := es.client.Search(
es.client.Search.WithContext(ctx),
es.client.Search.WithIndex(index),
es.client.Search.WithBody(buf),
es.client.Search.WithPretty(),
)
if err != nil {
return nil, errors.Wrap(err, "查询ES失败")
}
defer data.Body.Close()
if data.IsError() {
return nil, decodeErrorResponse(data.Body, data.Status())
}
return parser(data.Body)
}
// decodeErrorResponse 解析错误响应
func decodeErrorResponse(body io.ReadCloser, status string) error {
var e map[string]interface{}
if err := json.NewDecoder(body).Decode(&e); err != nil {
return errors.Wrap(err, "解析错误响应失败")
}
return errors.New(fmt.Sprintf("[%s] %s: %s", status, e["error"].(map[string]interface{})["type"], e["error"].(map[string]interface{})["reason"]))
}
default.go
package elasticsearch
import (
"SnapLink/internal/config"
"context"
"github.com/elastic/go-elasticsearch/v8"
"github.com/pkg/errors"
"github.com/zhufuyi/sponge/pkg/logger"
"sync"
)
var instance struct {
es *ES
once sync.Once
}
func esInstance() *ES {
instance.once.Do(
func() {
esConfig := config.Get().Elasticsearch
cfg := elasticsearch.Config{
Addresses: esConfig.Addresses,
Username: esConfig.Username,
Password: esConfig.Password,
CloudID: esConfig.CloudID,
APIKey: esConfig.APIKey,
ServiceToken: esConfig.ServiceToken,
CertificateFingerprint: esConfig.CertificateFingerprint,
Header: nil,
CACert: nil,
RetryOnStatus: esConfig.RetryOnStatus,
DisableRetry: esConfig.DisableRetry,
MaxRetries: esConfig.MaxRetries,
RetryOnError: nil,
CompressRequestBody: esConfig.CompressRequestBody,
CompressRequestBodyLevel: esConfig.CompressRequestBodyLevel,
DiscoverNodesOnStart: esConfig.DiscoverNodesOnStart,
DiscoverNodesInterval: esConfig.DiscoverNodesInterval,
EnableMetrics: esConfig.EnableMetrics,
EnableDebugLogger: esConfig.EnableDebugLogger,
EnableCompatibilityMode: esConfig.EnableCompatibilityMode,
DisableMetaHeader: esConfig.DisableMetaHeader,
RetryBackoff: nil,
Transport: nil,
Logger: nil,
Selector: nil,
ConnectionPoolFunc: nil,
Instrumentation: nil,
}
var err error
instance.es, err = NewES(&cfg)
if err != nil {
logger.Panic(errors.Wrap(err, "init default es failed").Error())
}
})
return instance.es
}
// Search ES 搜索 API
func Search(ctx context.Context, index string, body any, parser ResponseParse) (map[string]any, error) {
return esInstance().Search(ctx, index, body, parser)
}
用法示例
本处截取自我的开源项目 Snaplink
// 去 ES 中查询访问情况
func searchTodayAccessStatic(ctx context.Context, uris []string) (map[string]any, error) {
// 获取 ES 实例
body := map[string]any{
"size": 0,
"_source": false,
"query": map[string]any{
"terms": map[string]any{
"info.uri.keyword": uris,
},
},
"aggs": map[string]any{
"statics": map[string]any{
"terms": map[string]any{
"field": "info.uri.keyword",
},
"aggs": map[string]any{
"today_pv": map[string]any{
"value_count": map[string]any{
"field": "info.uri.keyword",
},
},
"today_uip": map[string]any{
"cardinality": map[string]any{
"field": "ip.keyword",
},
},
"today_uv": map[string]any{
"cardinality": map[string]any{
"field": "uid.keyword",
},
},
},
},
},
}
index := fmt.Sprintf("logstash-accesslog-%s.*", time.Now().Format("2006.01.02"))
return elasticsearch.Search(ctx, index, body, accessStaticResponseParser)
}
// accessStaticResponseParser 访问日志查询响应
func accessStaticResponseParser(body io.ReadCloser) (map[string]any, error) {
var r map[string]interface{}
if err := json.NewDecoder(body).Decode(&r); err != nil {
return nil, errors.Wrap(err, "解析响应失败")
}
aggregations, ok := r["aggregations"].(map[string]any)
if !ok {
return nil, ErrBucketsIsEmpty
}
buckets := aggregations["statics"].(map[string]any)["buckets"].([]any)
if len(buckets) == 0 {
return nil, ErrBucketsIsEmpty
}
statics := make(map[string]any, len(buckets))
for _, bucket := range buckets {
key := bucket.(map[string]any)["key"].(string)
statics[key] = bucket
}
return statics, nil
}