package eapi import ( "bytes" "context" "encoding/json" "errors" "gadmin/config" "strings" "github.com/sirupsen/logrus" "github.com/spf13/cast" ) // HasIndex 是否存在指定索引 func HasIndex(ctx context.Context, index string) (has bool, err error) { client := config.GetElastic().Cat.Shards res, err := client( client.WithContext(ctx), ) defer res.Body.Close() if err != nil { return } shards, err := String(res.Body) if err != nil { return } s1 := strings.Split(shards, "\n") for _, s := range s1 { s2 := strings.Split(s, " ") if len(s2) < 1 { continue } if s2[0] != "" && s2[0] == index { has = true return } } return } // CreateIndex 创建索引 func CreateIndex(ctx context.Context, index string, dsl string) (err error) { if index == "" { err = errors.New("index names cannot be empty") return } hasIndex, err := HasIndex(ctx, index) if err != nil || hasIndex { return } body := &bytes.Buffer{} body.Write([]byte(dsl)) client := config.GetElastic().Indices.Create // 创建一个新的索引 res, err := client( index, client.WithContext(ctx), client.WithBody(body), ) defer res.Body.Close() if err != nil { return } if res.IsError() { err = errors.New(res.String()) return } return } // Bulk 批量操作 // 批量操作对应ES的REST API是: // POST //_bulk // { "index" : { "_id" : "1" } } // { "field1" : "value1" } // { "delete" : { "_id" : "2" } } // { "create" : { "_id" : "3" } } // { "field1" : "value3" } // { "update" : {"_id" : "1" } } // { "doc" : {"field2" : "value2"} } // 对应index, create, update操作, // 提交的数据都是由两行组成,第一行是meta数据,描述操作信息 // ,第二行是具体提交的数据,对于delete操作只有一行meta数据。 对照REST API func Bulk(ctx context.Context, index string, body *bytes.Buffer) (err error) { client := config.GetElastic().Bulk res, err := client( body, client.WithContext(ctx), client.WithIndex(index), ) defer res.Body.Close() if err != nil { return } if res.IsError() { err = errors.New(res.String()) return } logrus.Warnf(res.String()) return } // Insert 插入数据 func InsertToEs(ctx context.Context, index string, id, v any) (err error) { body := &bytes.Buffer{} if err = json.NewEncoder(body).Encode(v); err != nil { return } client := config.GetElastic().Create // 插入 res, err := client( index, cast.ToString(id), body, client.WithContext(ctx), ) defer res.Body.Close() if err != nil { return } if res.IsError() { err = errors.New(res.String()) return } return } // Search 搜索数据 func Search(ctx context.Context, index string, query map[string]interface{}) (sources []*Var, err error) { //query := map[string]interface{}{ // "query": map[string]interface{}{ // "match_phrase": map[string]interface{}{ // "extra": `{"type":1}`, // }, // }, // "aggs": map[string]interface{}{ // 合查询语句的简写 // "count_name": map[string]interface{}{ // 给聚合查询取个名字, // "value_count": map[string]interface{}{ // "field": "name.keyword", // // }, // }, // }, // //"from": 0, // //"size": 1, // //"sort": []map[string]interface{}{ // // {"pubDate": []map[string]interface{}{ // // {"order": "desc"}, // // }, // // }, // //}, //} marshal, err := json.Marshal(query) if err != nil { return } client := config.GetElastic().Search res, err := client( client.WithContext(ctx), client.WithIndex(index), client.WithBody(bytes.NewReader(marshal)), ) defer res.Body.Close() if res.IsError() { err = errors.New(res.String()) return } var search *SearchModel if err = Unmarshal(res.Body, &search); err != nil { return } if search == nil { return } for _, hit := range search.Hits.Hits { sources = append(sources, NewVar(hit.Source)) } return } // Count 统计数据行 func Count(ctx context.Context, index string, query map[string]interface{}) (total int64, err error) { marshal, err := json.Marshal(query) if err != nil { return } client := config.GetElastic().Count res, err := client( client.WithContext(ctx), client.WithIndex(index), client.WithBody(bytes.NewReader(marshal)), ) defer res.Body.Close() if res.IsError() { err = errors.New(res.String()) return } var count *CountModel err = Unmarshal(res.Body, &count) total = count.Count return } func DeleteByQuery(ctx context.Context, index []string, query map[string]interface{}) (err error) { marshal, err := json.Marshal(query) if err != nil { return } client := config.GetElastic().DeleteByQuery res, err := client( index, bytes.NewReader(marshal), client.WithContext(ctx), client.WithConflicts("proceed"), client.WithRefresh(true), ) defer res.Body.Close() if res.IsError() { err = errors.New(res.String()) return } //logrus.Warnf(res.String()) return } // Cardinality 统计去重数据行 func Cardinality(ctx context.Context, index string, query map[string]interface{}, field string) (total int64, err error) { query["_source"] = false query["size"] = 0 query["aggs"] = M{ "count": M{ "cardinality": M{ "field": field, }, }, } marshal, err := json.Marshal(query) if err != nil { return } client := config.GetElastic().Search res, err := client( client.WithContext(ctx), client.WithIndex(index), client.WithBody(bytes.NewReader(marshal)), ) defer res.Body.Close() if res.IsError() { err = errors.New(res.String()) return } var count *CardinalityModel if err = Unmarshal(res.Body, &count); err != nil { return } total = count.Aggregations.Count.Value return } // First 第一条数据 func First(ctx context.Context, index string, field string) (val *Var, err error) { query := make(map[string]interface{}) query["sort"] = M{ field: "asc", } query["size"] = 1 marshal, err := json.Marshal(query) if err != nil { return } client := config.GetElastic().Search res, err := client( client.WithContext(ctx), client.WithIndex(index), client.WithBody(bytes.NewReader(marshal)), ) defer res.Body.Close() if res.IsError() { err = errors.New(res.String()) return } var models *FirstModel if err = Unmarshal(res.Body, &models); err != nil { return } // 没有数据 if len(models.Hits.Hits) == 0 { return } val = NewVar(models.Hits.Hits[0].Source) return } func FirstSan(ctx context.Context, index string, field string, v any) (err error) { val, err := First(ctx, index, field) if err != nil { return } return val.Scan(&v) } // Last 最后一条数据 func Last(ctx context.Context, index string, query map[string]interface{}, field string) (val *Var, err error) { if query == nil { query = make(map[string]interface{}) } query["sort"] = M{ field: "desc", } query["size"] = 1 marshal, err := json.Marshal(query) if err != nil { return } client := config.GetElastic().Search res, err := client( client.WithContext(ctx), client.WithIndex(index), client.WithBody(bytes.NewReader(marshal)), ) defer res.Body.Close() if res.IsError() { err = errors.New(res.String()) return } var models *FirstModel if err = Unmarshal(res.Body, &models); err != nil { return } // 没有数据 if len(models.Hits.Hits) == 0 { return } val = NewVar(models.Hits.Hits[0].Source) return } func LastSan(ctx context.Context, index string, query map[string]interface{}, field string, v any) (err error) { val, err := Last(ctx, index, query, field) if err != nil { return } if val == nil { return } return val.Scan(&v) } // TopList 排行榜列表数据[index][user_id:score] func TopList(ctx context.Context, index string, query map[string]interface{}, field string, size int) (buckets []TopBucket, err error) { if query == nil { query = make(M) } query["_source"] = false query["aggs"] = M{ "vals": M{ "terms": M{ "size": size, "field": field, }, }, } //logrus.Warnf("TopList index:%v\n where:%v\n", index, utility.DumpToJSON(query)) marshal, err := json.Marshal(query) if err != nil { return } client := config.GetElastic().Search res, err := client( client.WithContext(ctx), client.WithIndex(index), client.WithBody(bytes.NewReader(marshal)), ) defer res.Body.Close() if res.IsError() { err = errors.New(res.String()) return } var models *TopListModel if err = Unmarshal(res.Body, &models); err != nil { return } // 没有数据 if models == nil { return } buckets = models.Aggregations.Vals.Buckets return } // TopSumScore 聚合字段的排行榜 func TopSumScore(ctx context.Context, index string, query map[string]interface{}, field, sumField string, size int) (buckets []TopSumScoreBucket, err error) { if query == nil { query = make(M) } query["size"] = 0 query["aggs"] = M{ "vals": M{ "terms": M{ "size": size, "field": field, "order": M{ "sum_score": "desc", }, }, "aggs": M{ "sum_score": M{ "sum": M{ "field": sumField, }, }, }, }, } marshal, err := json.Marshal(query) if err != nil { return } client := config.GetElastic().Search res, err := client( client.WithContext(ctx), client.WithIndex(index), client.WithBody(bytes.NewReader(marshal)), ) if err != nil { return } defer res.Body.Close() if res.IsError() { err = errors.New(res.String()) return } var models *TopSumScoreListModel if err = Unmarshal(res.Body, &models); err != nil { return } // 没有数据 if models == nil { return } buckets = models.Aggregations.Vals.Buckets return } func ExistElastic() bool { return config.ExistElastic() }