package dao import ( "encoding/json" "fmt" "service/app/mix/conf" "service/bizcommon/util" "service/dbstruct" "service/library/elasticsearchdb" "service/library/logger" "time" "unicode" "service/api/consts" streameracctproto "service/api/proto/streamer_acct/proto" "github.com/gin-gonic/gin" "github.com/olivere/elastic/v7" ) const ( EsEnvProd = 0 EsEnvTest = 1 EsEnvLocal = 2 ) type ElasticSearch struct { clientMix *elastic.Client } func NewElasticSearch(cfg *conf.ConfigSt) (es *ElasticSearch, err error) { es = new(ElasticSearch) es.clientMix, err = elasticsearchdb.NewElasticSearchClient(cfg.ElasticSearch) if err != nil { logger.Error("NewElasticSearchClient fail, cfg: %v, err: %v", util.ToJson(cfg.ElasticSearch), err) return } return } const ( IndexStreamerAcct = "new_streamer_acct" TypeStreamerAcct = "_doc" ) func (es *ElasticSearch) getIndexStreamerAcct() string { return IndexStreamerAcct } func (es *ElasticSearch) CreateStreamerAcct(ctx *gin.Context, streameraccts []*dbstruct.EsStreamerAcct) error { bulk := es.clientMix.Bulk().Index(es.getIndexStreamerAcct()).Refresh("true") for _, streameracct := range streameraccts { bulk.Add(elastic.NewBulkCreateRequest().Id(fmt.Sprint(streameracct.Mid)).Doc(streameracct)) } _, err := bulk.Do(ctx) return err } func (es *ElasticSearch) UpdateStreamerAcct(ctx *gin.Context, streameracct *dbstruct.EsStreamerAcct) error { _, err := es.clientMix.Update().Index(es.getIndexStreamerAcct()).Id(fmt.Sprint(streameracct.Mid)).Doc(map[string]any{ "name": streameracct.Name, "pinyin": streameracct.PinYin, "ut": streameracct.Ut, }).Do(ctx) return err } func (es *ElasticSearch) UpdateStreamerAcctSelectively(ctx *gin.Context, streameracct *dbstruct.EsStreamerAcctUpdater) error { _, err := es.clientMix.Update().Index(es.getIndexStreamerAcct()).Id(fmt.Sprint(streameracct.GetMid())).Doc(util.EntityToM(streameracct)).Do(ctx) return err } func (es *ElasticSearch) UpdateStreamerAcctSelectivelyByMids(ctx *gin.Context, streameracct *dbstruct.EsStreamerAcctUpdater, mids []int64) error { mp := util.EntityToM(streameracct) bulk := es.clientMix.Bulk().Index(es.getIndexStreamerAcct()).Refresh("true") for _, mid := range mids { bulk.Add(elastic.NewBulkUpdateRequest().Id(fmt.Sprint(mid)).Doc(mp)) } _, err := bulk.Do(ctx) return err } func (es *ElasticSearch) GetStreamerAcctListFuzzilyByName(ctx *gin.Context, req *streameracctproto.OpListFuzzilyByNameReq) (list []*dbstruct.EsStreamerAcct, err error) { list = make([]*dbstruct.EsStreamerAcct, 0) var nameClause elastic.Query var pinyinClause elastic.Query // 查询文字长度小于短语匹配的,进行Term匹配,否则进行短语匹配 len := 0 for _, char := range req.Name { if unicode.Is(unicode.Han, char) { len++ } else { len++ } } if len < consts.StreamerAcct_MatchPhraseLen { nameClause = elastic.NewTermQuery("name.short_char", req.Name) pinyinClause = elastic.NewTermQuery("pinyin.short_char", req.Name) } else { nameClause = elastic.NewMatchPhraseQuery("name.long_char", req.Name) pinyinClause = elastic.NewMatchPhraseQuery("pinyin.long_char", req.Name) } orClause := elastic.NewBoolQuery().Should(nameClause, pinyinClause) delFlagClause := elastic.NewMatchQuery("del_flag", 0) mustnotClause := elastic.NewTermsQueryFromStrings("_id", req.BlockedFromBeingSearchedList...) query := elastic.NewBoolQuery().Must(orClause, delFlagClause).MustNot(mustnotClause) res, err := es.clientMix.Search(es.getIndexStreamerAcct()).Query(query).From(req.Offset).Size(req.Limit).Sort("ct", true).Do(ctx) if err != nil { logger.Error("Search %v fail, err: %v", IndexStreamerAcct, err) return } for _, hit := range res.Hits.Hits { streameracct := &dbstruct.EsStreamerAcct{} err = json.Unmarshal(hit.Source, streameracct) if err != nil { logger.Error("json Unmarshal fail, err: %v", err) return } list = append(list, streameracct) } return } func (es *ElasticSearch) GetStreamerAcctListFuzzilyByUserId(ctx *gin.Context, req *streameracctproto.OpListFuzzilyByUserIdReq) (list []*dbstruct.EsStreamerAcct, err error) { list = make([]*dbstruct.EsStreamerAcct, 0) var nameClause elastic.Query var userIdStringClause elastic.Query // 查询文字长度小于短语匹配的,进行Term匹配,否则进行短语匹配 len := 0 for _, char := range req.UserIdString { if unicode.Is(unicode.Han, char) { len++ } else { len++ } } if len < consts.StreamerAcct_MatchPhraseLen { nameClause = elastic.NewTermQuery("name.short_char", req.UserIdString) userIdStringClause = elastic.NewTermQuery("user_id_string.short_char", req.UserIdString) } else { nameClause = elastic.NewMatchPhraseQuery("name.long_char", req.UserIdString) userIdStringClause = elastic.NewMatchPhraseQuery("user_id_string.long_char", req.UserIdString) } orClause := elastic.NewBoolQuery().Should(nameClause, userIdStringClause) delFlagClause := elastic.NewMatchQuery("del_flag", 0) mustnotClause := elastic.NewTermsQueryFromStrings("_id", req.BlockedFromBeingSearchedList...) query := elastic.NewBoolQuery().Must(orClause, delFlagClause).MustNot(mustnotClause) res, err := es.clientMix.Search(es.getIndexStreamerAcct()).Query(query).From(req.Offset).Size(req.Limit).Sort("ct", true).Do(ctx) if err != nil { logger.Error("Search %v fail, err: %v", IndexStreamerAcct, err) return } for _, hit := range res.Hits.Hits { streameracct := &dbstruct.EsStreamerAcct{} err = json.Unmarshal(hit.Source, streameracct) if err != nil { logger.Error("json Unmarshal fail, err: %v", err) return } list = append(list, streameracct) } return } func (es *ElasticSearch) FilterStreamerAcctList(ctx *gin.Context, req *streameracctproto.OpFilterReq) (list []*dbstruct.EsStreamerAcct, scorelist []float64, fullscore float64, err error) { list = make([]*dbstruct.EsStreamerAcct, 0) scorelist = make([]float64, 0) fullscore = 0 // 查询范围 delFlagClause := elastic.NewMatchQuery("del_flag", 0) mustnotClause := elastic.NewTermsQueryFromStrings("_id", req.BlockedFromBeingSearchedList...) condition := elastic.NewBoolQuery().Must(delFlagClause).MustNot(mustnotClause).Boost(1.0) // 建立得分机制 funcScoreQuery := elastic.NewFunctionScoreQuery().Query(condition) funcScoreQuery.ScoreMode("sum") funcScoreQuery.BoostMode("replace") funcScoreQuery.MinScore(0) funcScoreQuery.MaxBoost(3.4028235e38) funcScoreQuery.Boost(1.0) // 写入关于年龄、粉丝量、身高、体重、微信价格、空间价格的高斯衰减打分 matchall := elastic.NewMatchAllQuery().Boost(1.0) agefunc := util.CreateGaussDecayFunction("age", req.Age, req.RefStreamerFilter.Age) fansfunc := util.CreateGaussDecayFunction("fans", req.Fans, req.RefStreamerFilter.Fans) heightfunc := util.CreateGaussDecayFunction("height", req.Height, req.RefStreamerFilter.Height) weightfunc := util.CreateGaussDecayFunction("weight", req.Weight, req.RefStreamerFilter.Weight) wechatcoinpricefunc := util.CreateGaussDecayFunction("wechat_coin_price", req.WechatCoinPrice, req.RefStreamerFilter.WechatCoinPrice) zoneadmissionpricefunc := util.CreateGaussDecayFunction("zone_admission_price", req.ZoneAdmissionPrice, req.RefStreamerFilter.ZoneAdmissionPrice) if agefunc != nil { funcScoreQuery.Add(matchall, agefunc) fullscore += req.RefStreamerFilter.Age.Weight } if fansfunc != nil { funcScoreQuery.Add(matchall, fansfunc) fullscore += req.RefStreamerFilter.Fans.Weight } if heightfunc != nil { funcScoreQuery.Add(matchall, heightfunc) fullscore += req.RefStreamerFilter.Height.Weight } if weightfunc != nil { funcScoreQuery.Add(matchall, weightfunc) fullscore += req.RefStreamerFilter.Weight.Weight } if wechatcoinpricefunc != nil { funcScoreQuery.Add(matchall, wechatcoinpricefunc) fullscore += req.RefStreamerFilter.WechatCoinPrice.Weight } if zoneadmissionpricefunc != nil { funcScoreQuery.Add(matchall, zoneadmissionpricefunc) fullscore += req.RefStreamerFilter.ZoneAdmissionPrice.Weight } // 写入关于城市、星座的脚本打分 if req.City != nil && util.DerefString(req.City) != "" { funcScoreQuery.AddScoreFunc(util.CreateKeywordScriptScoreFunction("city", util.DerefString(req.City), req.RefStreamerFilter.CityWeight)) fullscore += req.RefStreamerFilter.CityWeight } if req.Constellation != nil && util.DerefString(req.Constellation) != "" { funcScoreQuery.AddScoreFunc(util.CreateKeywordScriptScoreFunction("constellation", util.DerefString(req.Constellation), req.RefStreamerFilter.ConstellationWeight)) fullscore += req.RefStreamerFilter.ConstellationWeight } // 写入关于是否7日内有更新的脚本打分 if req.IsActiveWithinAWeek != nil { isActiveWithinAWeek := util.DerefInt64(req.IsActiveWithinAWeek) if isActiveWithinAWeek == 1 { daystartBeforeAWeek := util.GetDayStartTimeStamp(time.Now()) - 7*86400 // 这是7日前的日始整点时间戳 scriptStr := "doc['%s'].value >= %d ? %f : %f" script := elastic.NewScript(fmt.Sprintf(scriptStr, "last_zone_moment_create_day_start", daystartBeforeAWeek, 1.0, 0.0)) funcScoreQuery.AddScoreFunc(elastic.NewScriptFunction(script).Weight(req.RefStreamerFilter.IsActiveWithinAWeekWeight)) fullscore += req.RefStreamerFilter.IsActiveWithinAWeekWeight } } // 按得分倒序排列 sortClause := elastic.NewScoreSort().Desc() source, _ := funcScoreQuery.Source() logger.Info(util.ToJson(source)) res, err := es.clientMix.Search(es.getIndexStreamerAcct()).Query(funcScoreQuery).From(req.Offset).Size(req.Limit).SortBy(sortClause).Do(ctx) if err != nil { logger.Error("Search %v fail, err: %v", IndexStreamerAcct, err) return } for _, hit := range res.Hits.Hits { streameracct := &dbstruct.EsStreamerAcct{} err = json.Unmarshal(hit.Source, streameracct) if err != nil { logger.Error("json Unmarshal fail, err: %v", err) return } list = append(list, streameracct) scorelist = append(scorelist, util.DerefFloat64(hit.Score)) } return }