package dao import ( "encoding/json" "fmt" "service/app/mix/conf" "service/bizcommon/util" "service/dbstruct" "service/library/elasticsearchdb" "service/library/logger" "service/api/consts" streameracctproto "service/api/proto/streamer_acct/proto" "github.com/gin-gonic/gin" "github.com/olivere/elastic/v7" ) const ( EsEnvProd = 0 EsEnvTest = 1 EsEnvLocal = 2 ) type ElasticSearch struct { clientMix *elastic.Client } func NewElasticSearch(cfg *conf.ConfigSt) (es *ElasticSearch, err error) { es = new(ElasticSearch) es.clientMix, err = elasticsearchdb.NewElasticSearchClient(cfg.ElasticSearch) if err != nil { logger.Error("NewElasticSearchClient fail, cfg: %v, err: %v", util.ToJson(cfg.ElasticSearch), err) return } return } const ( IndexStreamerAcct = "streamer_acct" TypeStreamerAcct = "_doc" ) func (es *ElasticSearch) getIndexStreamerAcct() string { return IndexStreamerAcct } func (es *ElasticSearch) CreateStreamerAcct(ctx *gin.Context, streameraccts []*dbstruct.EsStreamerAcct) error { bulk := es.clientMix.Bulk().Index(es.getIndexStreamerAcct()).Refresh("true") for _, streameracct := range streameraccts { bulk.Add(elastic.NewBulkCreateRequest().Doc(streameracct)) } _, err := bulk.Do(ctx) return err } func (es *ElasticSearch) UpdateStreamerAcct(ctx *gin.Context, streameracct *dbstruct.EsStreamerAcct) error { _, err := es.clientMix.Update().Index(es.getIndexStreamerAcct()).Id(fmt.Sprint(streameracct.Mid)).Doc(map[string]any{ "name": streameracct.Name, "ut": streameracct.Ut, }).Do(ctx) return err } func (es *ElasticSearch) GetStreamerAcctListFuzzilyByName(ctx *gin.Context, req *streameracctproto.OpListFuzzilyByNameReq) (list []*dbstruct.EsStreamerAcct, err error) { list = make([]*dbstruct.EsStreamerAcct, 0) var nameClause elastic.Query // 查询文字长度小于短语匹配的,进行Term匹配,否则进行短语匹配 if len(req.Name) < consts.StreamerAcct_MatchPhraseLen { nameClause = elastic.NewTermQuery("name", req.Name) } else { nameClause = elastic.NewMatchPhraseQuery("name", req.Name) } query := elastic.NewBoolQuery() delFlagClause := elastic.NewMatchQuery("del_flag", 0) query.Must(nameClause, delFlagClause) res, err := es.clientMix.Search(es.getIndexStreamerAcct()).Query(query).From(req.Offset).Size(req.Limit).Sort("ct", true).Do(ctx) if err != nil { logger.Error("Search %v fail, err: %v", IndexStreamerAcct, err) return } for _, hit := range res.Hits.Hits { streameracct := &dbstruct.EsStreamerAcct{} err = json.Unmarshal(hit.Source, streameracct) if err != nil { logger.Error("json Unmarshal fail, err: %v", err) return } list = append(list, streameracct) } return } func (es *ElasticSearch) GetStreamerAcctListFuzzilyByUserId(ctx *gin.Context, req *streameracctproto.OpListFuzzilyByUserIdReq) (list []*dbstruct.EsStreamerAcct, err error) { list = make([]*dbstruct.EsStreamerAcct, 0) var nameClause elastic.Query var userIdStringClause elastic.Query // 查询文字长度小于短语匹配的,进行Term匹配,否则进行短语匹配 if len(req.UserIdString) < consts.StreamerAcct_MatchPhraseLen { nameClause = elastic.NewTermQuery("name", req.UserIdString) userIdStringClause = elastic.NewTermQuery("user_id_string", req.UserIdString) } else { nameClause = elastic.NewMatchQuery("name", req.UserIdString) userIdStringClause = elastic.NewMatchPhraseQuery("user_id_string", req.UserIdString) } orClause := elastic.NewBoolQuery().Should(nameClause, userIdStringClause) query := elastic.NewBoolQuery().Must() delFlagClause := elastic.NewMatchQuery("del_flag", 0) query.Must(orClause, delFlagClause) res, err := es.clientMix.Search(es.getIndexStreamerAcct()).Query(query).From(req.Offset).Size(req.Limit).Sort("ct", true).Do(ctx) if err != nil { logger.Error("Search %v fail, err: %v", IndexStreamerAcct, err) return } for _, hit := range res.Hits.Hits { streameracct := &dbstruct.EsStreamerAcct{} err = json.Unmarshal(hit.Source, streameracct) if err != nil { logger.Error("json Unmarshal fail, err: %v", err) return } list = append(list, streameracct) } return }