2024-05-11 10:22:32 +08:00
|
|
|
|
package dao
|
|
|
|
|
|
2024-05-11 19:29:33 +08:00
|
|
|
|
import (
|
|
|
|
|
"encoding/json"
|
|
|
|
|
"fmt"
|
|
|
|
|
"service/app/mix/conf"
|
|
|
|
|
"service/bizcommon/util"
|
|
|
|
|
"service/dbstruct"
|
|
|
|
|
"service/library/elasticsearchdb"
|
|
|
|
|
"service/library/logger"
|
|
|
|
|
|
2024-05-14 22:31:33 +08:00
|
|
|
|
"service/api/consts"
|
2024-05-11 19:29:33 +08:00
|
|
|
|
streameracctproto "service/api/proto/streamer_acct/proto"
|
|
|
|
|
|
|
|
|
|
"github.com/gin-gonic/gin"
|
|
|
|
|
"github.com/olivere/elastic/v7"
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
const (
|
|
|
|
|
EsEnvProd = 0
|
|
|
|
|
EsEnvTest = 1
|
|
|
|
|
EsEnvLocal = 2
|
|
|
|
|
)
|
|
|
|
|
|
2024-05-11 10:22:32 +08:00
|
|
|
|
type ElasticSearch struct {
|
2024-05-11 19:29:33 +08:00
|
|
|
|
clientMix *elastic.Client
|
|
|
|
|
indexsuffix string
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func NewElasticSearch(cfg *conf.ConfigSt) (es *ElasticSearch, err error) {
|
|
|
|
|
es = new(ElasticSearch)
|
|
|
|
|
|
|
|
|
|
es.clientMix, err = elasticsearchdb.NewElasticSearchClient(cfg.ElasticSearch)
|
|
|
|
|
if err != nil {
|
|
|
|
|
logger.Error("NewElasticSearchClient fail, cfg: %v, err: %v", util.ToJson(cfg.ElasticSearch), err)
|
|
|
|
|
return
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
es.indexsuffix = cfg.ElasticSearch.IndexSuffix
|
|
|
|
|
|
|
|
|
|
return
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
const (
|
|
|
|
|
IndexStreamerAcct = "streamer_acct"
|
|
|
|
|
TypeStreamerAcct = "_doc"
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
func (es *ElasticSearch) getIndexStreamerAcct() string {
|
|
|
|
|
return IndexStreamerAcct + es.indexsuffix
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (es *ElasticSearch) CreateStreamerAcct(ctx *gin.Context, streameraccts []*dbstruct.EsStreamerAcct) error {
|
|
|
|
|
bulk := es.clientMix.Bulk().Index(es.getIndexStreamerAcct()).Refresh("true")
|
|
|
|
|
for _, streameracct := range streameraccts {
|
|
|
|
|
bulk.Add(elastic.NewBulkCreateRequest().Doc(streameracct))
|
|
|
|
|
}
|
|
|
|
|
_, err := bulk.Do(ctx)
|
|
|
|
|
return err
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (es *ElasticSearch) UpdateStreamerAcct(ctx *gin.Context, streameracct *dbstruct.EsStreamerAcct) error {
|
|
|
|
|
_, err := es.clientMix.Update().Index(es.getIndexStreamerAcct()).Id(fmt.Sprint(streameracct.Mid)).Doc(map[string]any{
|
|
|
|
|
"name": streameracct.Name,
|
|
|
|
|
"ut": streameracct.Ut,
|
|
|
|
|
}).Do(ctx)
|
|
|
|
|
return err
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (es *ElasticSearch) GetStreamerAcctListFuzzilyByName(ctx *gin.Context, req *streameracctproto.OpListFuzzilyByNameReq) (list []*dbstruct.EsStreamerAcct, err error) {
|
|
|
|
|
|
|
|
|
|
list = make([]*dbstruct.EsStreamerAcct, 0)
|
|
|
|
|
|
2024-05-14 22:31:33 +08:00
|
|
|
|
var nameClause elastic.Query
|
|
|
|
|
// 查询文字长度小于短语匹配的,进行Term匹配,否则进行短语匹配
|
|
|
|
|
if len(req.Name) < consts.StreamerAcct_MatchPhraseLen {
|
|
|
|
|
nameClause = elastic.NewTermQuery("name", req.Name)
|
|
|
|
|
} else {
|
|
|
|
|
nameClause = elastic.NewMatchPhraseQuery("name", req.Name)
|
|
|
|
|
}
|
2024-05-11 19:29:33 +08:00
|
|
|
|
query := elastic.NewBoolQuery()
|
2024-05-14 22:31:33 +08:00
|
|
|
|
delFlagClause := elastic.NewMatchQuery("del_flag", 0)
|
2024-05-11 19:29:33 +08:00
|
|
|
|
query.Must(nameClause, delFlagClause)
|
|
|
|
|
|
|
|
|
|
res, err := es.clientMix.Search(es.getIndexStreamerAcct()).Query(query).From(req.Offset).Size(req.Limit).Sort("ct", true).Do(ctx)
|
|
|
|
|
if err != nil {
|
|
|
|
|
logger.Error("Search %v fail, err: %v", IndexStreamerAcct, err)
|
|
|
|
|
return
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
for _, hit := range res.Hits.Hits {
|
|
|
|
|
streameracct := &dbstruct.EsStreamerAcct{}
|
|
|
|
|
err = json.Unmarshal(hit.Source, streameracct)
|
|
|
|
|
if err != nil {
|
|
|
|
|
logger.Error("json Unmarshal fail, err: %v", err)
|
|
|
|
|
return
|
|
|
|
|
}
|
|
|
|
|
list = append(list, streameracct)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
return
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (es *ElasticSearch) GetStreamerAcctListFuzzilyByUserId(ctx *gin.Context, req *streameracctproto.OpListFuzzilyByUserIdReq) (list []*dbstruct.EsStreamerAcct, err error) {
|
|
|
|
|
|
|
|
|
|
list = make([]*dbstruct.EsStreamerAcct, 0)
|
|
|
|
|
|
2024-05-14 22:31:33 +08:00
|
|
|
|
var nameClause elastic.Query
|
|
|
|
|
var userIdStringClause elastic.Query
|
|
|
|
|
// 查询文字长度小于短语匹配的,进行Term匹配,否则进行短语匹配
|
|
|
|
|
if len(req.UserIdString) < consts.StreamerAcct_MatchPhraseLen {
|
|
|
|
|
nameClause = elastic.NewTermQuery("name", req.UserIdString)
|
|
|
|
|
userIdStringClause = elastic.NewTermQuery("user_id_string", req.UserIdString)
|
|
|
|
|
} else {
|
|
|
|
|
nameClause = elastic.NewMatchQuery("name", req.UserIdString)
|
|
|
|
|
userIdStringClause = elastic.NewMatchPhraseQuery("user_id_string", req.UserIdString)
|
|
|
|
|
}
|
2024-05-11 19:29:33 +08:00
|
|
|
|
orClause := elastic.NewBoolQuery().Should(nameClause, userIdStringClause)
|
|
|
|
|
query := elastic.NewBoolQuery().Must()
|
2024-05-14 22:31:33 +08:00
|
|
|
|
delFlagClause := elastic.NewMatchQuery("del_flag", 0)
|
2024-05-11 19:29:33 +08:00
|
|
|
|
query.Must(orClause, delFlagClause)
|
|
|
|
|
|
|
|
|
|
res, err := es.clientMix.Search(es.getIndexStreamerAcct()).Query(query).From(req.Offset).Size(req.Limit).Sort("ct", true).Do(ctx)
|
|
|
|
|
if err != nil {
|
|
|
|
|
logger.Error("Search %v fail, err: %v", IndexStreamerAcct, err)
|
|
|
|
|
return
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
for _, hit := range res.Hits.Hits {
|
|
|
|
|
streameracct := &dbstruct.EsStreamerAcct{}
|
|
|
|
|
err = json.Unmarshal(hit.Source, streameracct)
|
|
|
|
|
if err != nil {
|
|
|
|
|
logger.Error("json Unmarshal fail, err: %v", err)
|
|
|
|
|
return
|
|
|
|
|
}
|
|
|
|
|
list = append(list, streameracct)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
return
|
2024-05-11 10:22:32 +08:00
|
|
|
|
}
|