2024-05-11 10:22:32 +08:00
|
|
|
|
package dao
|
|
|
|
|
|
2024-05-11 19:29:33 +08:00
|
|
|
|
import (
|
|
|
|
|
"encoding/json"
|
|
|
|
|
"fmt"
|
|
|
|
|
"service/app/mix/conf"
|
|
|
|
|
"service/bizcommon/util"
|
|
|
|
|
"service/dbstruct"
|
|
|
|
|
"service/library/elasticsearchdb"
|
|
|
|
|
"service/library/logger"
|
2024-08-15 07:07:39 +08:00
|
|
|
|
"time"
|
2024-05-14 22:58:12 +08:00
|
|
|
|
"unicode"
|
2024-05-11 19:29:33 +08:00
|
|
|
|
|
2024-05-14 22:31:33 +08:00
|
|
|
|
"service/api/consts"
|
2024-05-11 19:29:33 +08:00
|
|
|
|
streameracctproto "service/api/proto/streamer_acct/proto"
|
|
|
|
|
|
|
|
|
|
"github.com/gin-gonic/gin"
|
|
|
|
|
"github.com/olivere/elastic/v7"
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
const (
|
|
|
|
|
EsEnvProd = 0
|
|
|
|
|
EsEnvTest = 1
|
|
|
|
|
EsEnvLocal = 2
|
|
|
|
|
)
|
|
|
|
|
|
2024-05-11 10:22:32 +08:00
|
|
|
|
type ElasticSearch struct {
|
2024-05-14 22:43:52 +08:00
|
|
|
|
clientMix *elastic.Client
|
2024-05-11 19:29:33 +08:00
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func NewElasticSearch(cfg *conf.ConfigSt) (es *ElasticSearch, err error) {
|
|
|
|
|
es = new(ElasticSearch)
|
|
|
|
|
|
|
|
|
|
es.clientMix, err = elasticsearchdb.NewElasticSearchClient(cfg.ElasticSearch)
|
|
|
|
|
if err != nil {
|
|
|
|
|
logger.Error("NewElasticSearchClient fail, cfg: %v, err: %v", util.ToJson(cfg.ElasticSearch), err)
|
|
|
|
|
return
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
return
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
const (
|
2024-08-15 07:07:39 +08:00
|
|
|
|
IndexStreamerAcct = "new_streamer_acct"
|
2024-05-11 19:29:33 +08:00
|
|
|
|
TypeStreamerAcct = "_doc"
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
func (es *ElasticSearch) getIndexStreamerAcct() string {
|
2024-05-14 22:43:52 +08:00
|
|
|
|
return IndexStreamerAcct
|
2024-05-11 19:29:33 +08:00
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (es *ElasticSearch) CreateStreamerAcct(ctx *gin.Context, streameraccts []*dbstruct.EsStreamerAcct) error {
|
|
|
|
|
bulk := es.clientMix.Bulk().Index(es.getIndexStreamerAcct()).Refresh("true")
|
|
|
|
|
for _, streameracct := range streameraccts {
|
2024-05-24 23:27:05 +08:00
|
|
|
|
bulk.Add(elastic.NewBulkCreateRequest().Id(fmt.Sprint(streameracct.Mid)).Doc(streameracct))
|
2024-05-11 19:29:33 +08:00
|
|
|
|
}
|
2024-05-24 23:27:51 +08:00
|
|
|
|
_, err := bulk.Do(ctx)
|
2024-05-11 19:29:33 +08:00
|
|
|
|
return err
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (es *ElasticSearch) UpdateStreamerAcct(ctx *gin.Context, streameracct *dbstruct.EsStreamerAcct) error {
|
|
|
|
|
_, err := es.clientMix.Update().Index(es.getIndexStreamerAcct()).Id(fmt.Sprint(streameracct.Mid)).Doc(map[string]any{
|
2024-06-13 12:56:29 +08:00
|
|
|
|
"name": streameracct.Name,
|
|
|
|
|
"pinyin": streameracct.PinYin,
|
|
|
|
|
"ut": streameracct.Ut,
|
2024-05-11 19:29:33 +08:00
|
|
|
|
}).Do(ctx)
|
|
|
|
|
return err
|
|
|
|
|
}
|
|
|
|
|
|
2024-08-15 07:07:39 +08:00
|
|
|
|
func (es *ElasticSearch) UpdateStreamerAcctSelectively(ctx *gin.Context, streameracct *dbstruct.EsStreamerAcctUpdater) error {
|
|
|
|
|
_, err := es.clientMix.Update().Index(es.getIndexStreamerAcct()).Id(fmt.Sprint(streameracct.GetMid())).Doc(util.EntityToM(streameracct)).Do(ctx)
|
|
|
|
|
return err
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (es *ElasticSearch) UpdateStreamerAcctSelectivelyByMids(ctx *gin.Context, streameracct *dbstruct.EsStreamerAcctUpdater, mids []int64) error {
|
|
|
|
|
mp := util.EntityToM(streameracct)
|
|
|
|
|
bulk := es.clientMix.Bulk().Index(es.getIndexStreamerAcct()).Refresh("true")
|
|
|
|
|
for _, mid := range mids {
|
|
|
|
|
bulk.Add(elastic.NewBulkUpdateRequest().Id(fmt.Sprint(mid)).Doc(mp))
|
|
|
|
|
}
|
|
|
|
|
_, err := bulk.Do(ctx)
|
|
|
|
|
return err
|
|
|
|
|
}
|
|
|
|
|
|
2024-05-11 19:29:33 +08:00
|
|
|
|
func (es *ElasticSearch) GetStreamerAcctListFuzzilyByName(ctx *gin.Context, req *streameracctproto.OpListFuzzilyByNameReq) (list []*dbstruct.EsStreamerAcct, err error) {
|
|
|
|
|
|
|
|
|
|
list = make([]*dbstruct.EsStreamerAcct, 0)
|
|
|
|
|
|
2024-05-14 22:31:33 +08:00
|
|
|
|
var nameClause elastic.Query
|
2024-06-13 12:29:14 +08:00
|
|
|
|
var pinyinClause elastic.Query
|
2024-05-14 22:31:33 +08:00
|
|
|
|
// 查询文字长度小于短语匹配的,进行Term匹配,否则进行短语匹配
|
2024-05-14 22:58:12 +08:00
|
|
|
|
len := 0
|
|
|
|
|
for _, char := range req.Name {
|
|
|
|
|
if unicode.Is(unicode.Han, char) {
|
|
|
|
|
len++
|
|
|
|
|
} else {
|
|
|
|
|
len++
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
if len < consts.StreamerAcct_MatchPhraseLen {
|
2024-05-14 23:01:58 +08:00
|
|
|
|
nameClause = elastic.NewTermQuery("name.short_char", req.Name)
|
2024-06-13 12:29:14 +08:00
|
|
|
|
pinyinClause = elastic.NewTermQuery("pinyin.short_char", req.Name)
|
2024-05-14 22:31:33 +08:00
|
|
|
|
} else {
|
2024-05-14 23:01:58 +08:00
|
|
|
|
nameClause = elastic.NewMatchPhraseQuery("name.long_char", req.Name)
|
2024-06-13 12:29:14 +08:00
|
|
|
|
pinyinClause = elastic.NewMatchPhraseQuery("pinyin.long_char", req.Name)
|
2024-05-14 22:31:33 +08:00
|
|
|
|
}
|
2024-06-13 12:29:14 +08:00
|
|
|
|
orClause := elastic.NewBoolQuery().Should(nameClause, pinyinClause)
|
2024-05-14 22:31:33 +08:00
|
|
|
|
delFlagClause := elastic.NewMatchQuery("del_flag", 0)
|
2024-06-21 20:28:40 +08:00
|
|
|
|
mustnotClause := elastic.NewTermsQueryFromStrings("_id", req.BlockedFromBeingSearchedList...)
|
|
|
|
|
query := elastic.NewBoolQuery().Must(orClause, delFlagClause).MustNot(mustnotClause)
|
2024-05-11 19:29:33 +08:00
|
|
|
|
|
|
|
|
|
res, err := es.clientMix.Search(es.getIndexStreamerAcct()).Query(query).From(req.Offset).Size(req.Limit).Sort("ct", true).Do(ctx)
|
|
|
|
|
if err != nil {
|
|
|
|
|
logger.Error("Search %v fail, err: %v", IndexStreamerAcct, err)
|
|
|
|
|
return
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
for _, hit := range res.Hits.Hits {
|
|
|
|
|
streameracct := &dbstruct.EsStreamerAcct{}
|
|
|
|
|
err = json.Unmarshal(hit.Source, streameracct)
|
|
|
|
|
if err != nil {
|
|
|
|
|
logger.Error("json Unmarshal fail, err: %v", err)
|
|
|
|
|
return
|
|
|
|
|
}
|
|
|
|
|
list = append(list, streameracct)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
return
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (es *ElasticSearch) GetStreamerAcctListFuzzilyByUserId(ctx *gin.Context, req *streameracctproto.OpListFuzzilyByUserIdReq) (list []*dbstruct.EsStreamerAcct, err error) {
|
|
|
|
|
|
|
|
|
|
list = make([]*dbstruct.EsStreamerAcct, 0)
|
|
|
|
|
|
2024-05-14 22:31:33 +08:00
|
|
|
|
var nameClause elastic.Query
|
|
|
|
|
var userIdStringClause elastic.Query
|
|
|
|
|
// 查询文字长度小于短语匹配的,进行Term匹配,否则进行短语匹配
|
2024-05-14 22:58:12 +08:00
|
|
|
|
len := 0
|
|
|
|
|
for _, char := range req.UserIdString {
|
|
|
|
|
if unicode.Is(unicode.Han, char) {
|
|
|
|
|
len++
|
|
|
|
|
} else {
|
|
|
|
|
len++
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
if len < consts.StreamerAcct_MatchPhraseLen {
|
2024-05-14 23:01:58 +08:00
|
|
|
|
nameClause = elastic.NewTermQuery("name.short_char", req.UserIdString)
|
|
|
|
|
userIdStringClause = elastic.NewTermQuery("user_id_string.short_char", req.UserIdString)
|
2024-05-14 22:31:33 +08:00
|
|
|
|
} else {
|
2024-05-14 23:01:58 +08:00
|
|
|
|
nameClause = elastic.NewMatchPhraseQuery("name.long_char", req.UserIdString)
|
|
|
|
|
userIdStringClause = elastic.NewMatchPhraseQuery("user_id_string.long_char", req.UserIdString)
|
2024-05-14 22:31:33 +08:00
|
|
|
|
}
|
2024-05-11 19:29:33 +08:00
|
|
|
|
orClause := elastic.NewBoolQuery().Should(nameClause, userIdStringClause)
|
2024-05-14 22:31:33 +08:00
|
|
|
|
delFlagClause := elastic.NewMatchQuery("del_flag", 0)
|
2024-06-21 20:28:40 +08:00
|
|
|
|
mustnotClause := elastic.NewTermsQueryFromStrings("_id", req.BlockedFromBeingSearchedList...)
|
|
|
|
|
query := elastic.NewBoolQuery().Must(orClause, delFlagClause).MustNot(mustnotClause)
|
2024-05-11 19:29:33 +08:00
|
|
|
|
|
|
|
|
|
res, err := es.clientMix.Search(es.getIndexStreamerAcct()).Query(query).From(req.Offset).Size(req.Limit).Sort("ct", true).Do(ctx)
|
|
|
|
|
if err != nil {
|
|
|
|
|
logger.Error("Search %v fail, err: %v", IndexStreamerAcct, err)
|
|
|
|
|
return
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
for _, hit := range res.Hits.Hits {
|
|
|
|
|
streameracct := &dbstruct.EsStreamerAcct{}
|
|
|
|
|
err = json.Unmarshal(hit.Source, streameracct)
|
|
|
|
|
if err != nil {
|
|
|
|
|
logger.Error("json Unmarshal fail, err: %v", err)
|
|
|
|
|
return
|
|
|
|
|
}
|
|
|
|
|
list = append(list, streameracct)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
return
|
2024-05-11 10:22:32 +08:00
|
|
|
|
}
|
2024-08-15 07:07:39 +08:00
|
|
|
|
|
2024-08-16 07:06:12 +08:00
|
|
|
|
func (es *ElasticSearch) FilterStreamerAcctList(ctx *gin.Context, req *streameracctproto.OpFilterReq) (list []*dbstruct.EsStreamerAcct, scorelist []float64, fullscore float64, err error) {
|
2024-08-15 07:07:39 +08:00
|
|
|
|
|
|
|
|
|
list = make([]*dbstruct.EsStreamerAcct, 0)
|
2024-08-16 06:14:55 +08:00
|
|
|
|
scorelist = make([]float64, 0)
|
|
|
|
|
fullscore = 0
|
2024-08-15 07:07:39 +08:00
|
|
|
|
|
2024-08-16 06:14:55 +08:00
|
|
|
|
// 查询范围
|
2024-08-15 07:07:39 +08:00
|
|
|
|
delFlagClause := elastic.NewMatchQuery("del_flag", 0)
|
|
|
|
|
mustnotClause := elastic.NewTermsQueryFromStrings("_id", req.BlockedFromBeingSearchedList...)
|
2024-08-16 06:14:55 +08:00
|
|
|
|
condition := elastic.NewBoolQuery().Must(delFlagClause).MustNot(mustnotClause).Boost(1.0)
|
|
|
|
|
|
|
|
|
|
// 建立得分机制
|
|
|
|
|
funcScoreQuery := elastic.NewFunctionScoreQuery().Query(condition)
|
|
|
|
|
funcScoreQuery.ScoreMode("sum")
|
|
|
|
|
funcScoreQuery.BoostMode("replace")
|
|
|
|
|
funcScoreQuery.MinScore(0)
|
|
|
|
|
funcScoreQuery.MaxBoost(3.4028235e38)
|
|
|
|
|
funcScoreQuery.Boost(1.0)
|
|
|
|
|
|
2024-08-20 17:35:50 +08:00
|
|
|
|
// 写入关于年龄、粉丝量、身高、体重、微信价格、空间价格的高斯衰减打分
|
2024-08-16 06:14:55 +08:00
|
|
|
|
matchall := elastic.NewMatchAllQuery().Boost(1.0)
|
|
|
|
|
agefunc := util.CreateGaussDecayFunction("age", req.Age, req.RefStreamerFilter.Age)
|
|
|
|
|
fansfunc := util.CreateGaussDecayFunction("fans", req.Fans, req.RefStreamerFilter.Fans)
|
|
|
|
|
heightfunc := util.CreateGaussDecayFunction("height", req.Height, req.RefStreamerFilter.Height)
|
|
|
|
|
weightfunc := util.CreateGaussDecayFunction("weight", req.Weight, req.RefStreamerFilter.Weight)
|
2024-08-20 17:35:50 +08:00
|
|
|
|
wechatcoinpricefunc := util.CreateGaussDecayFunction("wechat_coin_price", req.WechatCoinPrice, req.RefStreamerFilter.WechatCoinPrice)
|
|
|
|
|
zoneadmissionpricefunc := util.CreateGaussDecayFunction("zone_admission_price", req.ZoneAdmissionPrice, req.RefStreamerFilter.ZoneAdmissionPrice)
|
2024-08-16 06:14:55 +08:00
|
|
|
|
|
|
|
|
|
if agefunc != nil {
|
|
|
|
|
funcScoreQuery.Add(matchall, agefunc)
|
2024-08-22 12:19:01 +08:00
|
|
|
|
fullscore += req.RefStreamerFilter.Age.Weight
|
2024-08-16 06:14:55 +08:00
|
|
|
|
}
|
|
|
|
|
if fansfunc != nil {
|
|
|
|
|
funcScoreQuery.Add(matchall, fansfunc)
|
2024-08-22 12:19:01 +08:00
|
|
|
|
fullscore += req.RefStreamerFilter.Fans.Weight
|
2024-08-16 06:14:55 +08:00
|
|
|
|
}
|
|
|
|
|
if heightfunc != nil {
|
|
|
|
|
funcScoreQuery.Add(matchall, heightfunc)
|
2024-08-22 12:19:01 +08:00
|
|
|
|
fullscore += req.RefStreamerFilter.Height.Weight
|
2024-08-16 06:14:55 +08:00
|
|
|
|
}
|
|
|
|
|
if weightfunc != nil {
|
|
|
|
|
funcScoreQuery.Add(matchall, weightfunc)
|
2024-08-22 12:19:01 +08:00
|
|
|
|
fullscore += req.RefStreamerFilter.Weight.Weight
|
2024-08-16 06:14:55 +08:00
|
|
|
|
}
|
2024-08-20 17:35:50 +08:00
|
|
|
|
if wechatcoinpricefunc != nil {
|
|
|
|
|
funcScoreQuery.Add(matchall, wechatcoinpricefunc)
|
2024-08-22 12:19:01 +08:00
|
|
|
|
fullscore += req.RefStreamerFilter.WechatCoinPrice.Weight
|
2024-08-20 17:35:50 +08:00
|
|
|
|
}
|
|
|
|
|
if zoneadmissionpricefunc != nil {
|
|
|
|
|
funcScoreQuery.Add(matchall, zoneadmissionpricefunc)
|
2024-08-22 12:19:01 +08:00
|
|
|
|
fullscore += req.RefStreamerFilter.ZoneAdmissionPrice.Weight
|
2024-08-20 17:35:50 +08:00
|
|
|
|
}
|
2024-08-15 07:07:39 +08:00
|
|
|
|
|
2024-08-16 06:14:55 +08:00
|
|
|
|
// 写入关于城市、星座的脚本打分
|
2024-08-21 22:19:56 +08:00
|
|
|
|
if req.City != nil && util.DerefString(req.City) != "" {
|
2024-08-16 06:14:55 +08:00
|
|
|
|
funcScoreQuery.AddScoreFunc(util.CreateKeywordScriptScoreFunction("city", util.DerefString(req.City), req.RefStreamerFilter.CityWeight))
|
2024-08-22 12:19:01 +08:00
|
|
|
|
fullscore += req.RefStreamerFilter.CityWeight
|
2024-08-15 07:07:39 +08:00
|
|
|
|
}
|
2024-08-21 22:19:56 +08:00
|
|
|
|
if req.Constellation != nil && util.DerefString(req.Constellation) != "" {
|
2024-08-16 06:14:55 +08:00
|
|
|
|
funcScoreQuery.AddScoreFunc(util.CreateKeywordScriptScoreFunction("constellation", util.DerefString(req.Constellation), req.RefStreamerFilter.ConstellationWeight))
|
2024-08-22 12:19:01 +08:00
|
|
|
|
fullscore += req.RefStreamerFilter.ConstellationWeight
|
2024-08-15 07:07:39 +08:00
|
|
|
|
}
|
2024-08-16 06:14:55 +08:00
|
|
|
|
|
|
|
|
|
// 写入关于是否7日内有更新的脚本打分
|
2024-08-15 07:07:39 +08:00
|
|
|
|
if req.IsActiveWithinAWeek != nil {
|
|
|
|
|
isActiveWithinAWeek := util.DerefInt64(req.IsActiveWithinAWeek)
|
|
|
|
|
if isActiveWithinAWeek == 1 {
|
|
|
|
|
daystartBeforeAWeek := util.GetDayStartTimeStamp(time.Now()) - 7*86400 // 这是7日前的日始整点时间戳
|
2024-08-16 06:14:55 +08:00
|
|
|
|
scriptStr := "doc['%s'].value >= %d ? %f : %f"
|
|
|
|
|
script := elastic.NewScript(fmt.Sprintf(scriptStr, "last_zone_moment_create_day_start", daystartBeforeAWeek, 1.0, 0.0))
|
|
|
|
|
funcScoreQuery.AddScoreFunc(elastic.NewScriptFunction(script).Weight(req.RefStreamerFilter.IsActiveWithinAWeekWeight))
|
2024-08-22 12:19:01 +08:00
|
|
|
|
fullscore += req.RefStreamerFilter.IsActiveWithinAWeekWeight
|
2024-08-15 07:07:39 +08:00
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
2024-08-16 06:14:55 +08:00
|
|
|
|
// 按得分倒序排列
|
|
|
|
|
sortClause := elastic.NewScoreSort().Desc()
|
|
|
|
|
|
2024-08-16 07:06:12 +08:00
|
|
|
|
source, _ := funcScoreQuery.Source()
|
|
|
|
|
logger.Info(util.ToJson(source))
|
|
|
|
|
|
2024-08-16 06:14:55 +08:00
|
|
|
|
res, err := es.clientMix.Search(es.getIndexStreamerAcct()).Query(funcScoreQuery).From(req.Offset).Size(req.Limit).SortBy(sortClause).Do(ctx)
|
2024-08-15 07:07:39 +08:00
|
|
|
|
if err != nil {
|
|
|
|
|
logger.Error("Search %v fail, err: %v", IndexStreamerAcct, err)
|
|
|
|
|
return
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
for _, hit := range res.Hits.Hits {
|
|
|
|
|
streameracct := &dbstruct.EsStreamerAcct{}
|
|
|
|
|
err = json.Unmarshal(hit.Source, streameracct)
|
|
|
|
|
if err != nil {
|
|
|
|
|
logger.Error("json Unmarshal fail, err: %v", err)
|
|
|
|
|
return
|
|
|
|
|
}
|
|
|
|
|
list = append(list, streameracct)
|
2024-08-16 06:14:55 +08:00
|
|
|
|
scorelist = append(scorelist, util.DerefFloat64(hit.Score))
|
2024-08-15 07:07:39 +08:00
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
return
|
|
|
|
|
}
|