service/app/mix/dao/elasticsearch.go

160 lines
4.9 KiB
Go
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

package dao
import (
"encoding/json"
"fmt"
"service/app/mix/conf"
"service/bizcommon/util"
"service/dbstruct"
"service/library/elasticsearchdb"
"service/library/logger"
"unicode"
"service/api/consts"
streameracctproto "service/api/proto/streamer_acct/proto"
"github.com/gin-gonic/gin"
"github.com/olivere/elastic/v7"
)
const (
EsEnvProd = 0
EsEnvTest = 1
EsEnvLocal = 2
)
type ElasticSearch struct {
clientMix *elastic.Client
}
func NewElasticSearch(cfg *conf.ConfigSt) (es *ElasticSearch, err error) {
es = new(ElasticSearch)
es.clientMix, err = elasticsearchdb.NewElasticSearchClient(cfg.ElasticSearch)
if err != nil {
logger.Error("NewElasticSearchClient fail, cfg: %v, err: %v", util.ToJson(cfg.ElasticSearch), err)
return
}
return
}
const (
IndexStreamerAcct = "streamer_acct"
TypeStreamerAcct = "_doc"
)
func (es *ElasticSearch) getIndexStreamerAcct() string {
return IndexStreamerAcct
}
func (es *ElasticSearch) CreateStreamerAcct(ctx *gin.Context, streameraccts []*dbstruct.EsStreamerAcct) error {
bulk := es.clientMix.Bulk().Index(es.getIndexStreamerAcct()).Refresh("true")
for _, streameracct := range streameraccts {
bulk.Add(elastic.NewBulkCreateRequest().Id(fmt.Sprint(streameracct.Mid)).Doc(streameracct))
}
_, err := bulk.Do(ctx)
return err
}
func (es *ElasticSearch) UpdateStreamerAcct(ctx *gin.Context, streameracct *dbstruct.EsStreamerAcct) error {
_, err := es.clientMix.Update().Index(es.getIndexStreamerAcct()).Id(fmt.Sprint(streameracct.Mid)).Doc(map[string]any{
"name": streameracct.Name,
"pinyin": streameracct.PinYin,
"ut": streameracct.Ut,
}).Do(ctx)
return err
}
func (es *ElasticSearch) GetStreamerAcctListFuzzilyByName(ctx *gin.Context, req *streameracctproto.OpListFuzzilyByNameReq) (list []*dbstruct.EsStreamerAcct, err error) {
list = make([]*dbstruct.EsStreamerAcct, 0)
var nameClause elastic.Query
var pinyinClause elastic.Query
// 查询文字长度小于短语匹配的进行Term匹配否则进行短语匹配
len := 0
for _, char := range req.Name {
if unicode.Is(unicode.Han, char) {
len++
} else {
len++
}
}
if len < consts.StreamerAcct_MatchPhraseLen {
nameClause = elastic.NewTermQuery("name.short_char", req.Name)
pinyinClause = elastic.NewTermQuery("pinyin.short_char", req.Name)
} else {
nameClause = elastic.NewMatchPhraseQuery("name.long_char", req.Name)
pinyinClause = elastic.NewMatchPhraseQuery("pinyin.long_char", req.Name)
}
orClause := elastic.NewBoolQuery().Should(nameClause, pinyinClause)
delFlagClause := elastic.NewMatchQuery("del_flag", 0)
mustnotClause := elastic.NewTermsQueryFromStrings("_id", req.BlockedFromBeingSearchedList...)
query := elastic.NewBoolQuery().Must(orClause, delFlagClause).MustNot(mustnotClause)
res, err := es.clientMix.Search(es.getIndexStreamerAcct()).Query(query).From(req.Offset).Size(req.Limit).Sort("ct", true).Do(ctx)
if err != nil {
logger.Error("Search %v fail, err: %v", IndexStreamerAcct, err)
return
}
for _, hit := range res.Hits.Hits {
streameracct := &dbstruct.EsStreamerAcct{}
err = json.Unmarshal(hit.Source, streameracct)
if err != nil {
logger.Error("json Unmarshal fail, err: %v", err)
return
}
list = append(list, streameracct)
}
return
}
func (es *ElasticSearch) GetStreamerAcctListFuzzilyByUserId(ctx *gin.Context, req *streameracctproto.OpListFuzzilyByUserIdReq) (list []*dbstruct.EsStreamerAcct, err error) {
list = make([]*dbstruct.EsStreamerAcct, 0)
var nameClause elastic.Query
var userIdStringClause elastic.Query
// 查询文字长度小于短语匹配的进行Term匹配否则进行短语匹配
len := 0
for _, char := range req.UserIdString {
if unicode.Is(unicode.Han, char) {
len++
} else {
len++
}
}
if len < consts.StreamerAcct_MatchPhraseLen {
nameClause = elastic.NewTermQuery("name.short_char", req.UserIdString)
userIdStringClause = elastic.NewTermQuery("user_id_string.short_char", req.UserIdString)
} else {
nameClause = elastic.NewMatchPhraseQuery("name.long_char", req.UserIdString)
userIdStringClause = elastic.NewMatchPhraseQuery("user_id_string.long_char", req.UserIdString)
}
orClause := elastic.NewBoolQuery().Should(nameClause, userIdStringClause)
delFlagClause := elastic.NewMatchQuery("del_flag", 0)
mustnotClause := elastic.NewTermsQueryFromStrings("_id", req.BlockedFromBeingSearchedList...)
query := elastic.NewBoolQuery().Must(orClause, delFlagClause).MustNot(mustnotClause)
res, err := es.clientMix.Search(es.getIndexStreamerAcct()).Query(query).From(req.Offset).Size(req.Limit).Sort("ct", true).Do(ctx)
if err != nil {
logger.Error("Search %v fail, err: %v", IndexStreamerAcct, err)
return
}
for _, hit := range res.Hits.Hits {
streameracct := &dbstruct.EsStreamerAcct{}
err = json.Unmarshal(hit.Source, streameracct)
if err != nil {
logger.Error("json Unmarshal fail, err: %v", err)
return
}
list = append(list, streameracct)
}
return
}