From 4e0afaa067c7470c94ef02b05c8a6d849478f309 Mon Sep 17 00:00:00 2001 From: Leufolium Date: Thu, 15 Aug 2024 07:07:39 +0800 Subject: [PATCH 01/11] by Robin at 20240815 --- api/proto/streamer/proto/streamer_api.go | 23 ++++ .../streamer_acct/proto/streamer_acct_op.go | 29 ++++- app/mix/controller/init.go | 1 + app/mix/controller/streamer_api.go | 36 ++++++ app/mix/dao/elasticsearch.go | 84 ++++++++++++- app/mix/dao/mongo.go | 17 +++ app/mix/service/apiservice.go | 89 +++++++++++++ app/mix/service/logic/streamer_acct.go | 27 ++++ app/mix/service/logic/zonemoment.go | 9 ++ app/mix/service/service.go | 117 ++++++++++++++---- bizcommon/util/util.go | 20 +++ dbstruct/streamer.go | 32 +++++ dbstruct/streamer_acct.go | 40 ++++-- library/taginterceptor/crypt_test.go | 2 +- 14 files changed, 490 insertions(+), 36 deletions(-) diff --git a/api/proto/streamer/proto/streamer_api.go b/api/proto/streamer/proto/streamer_api.go index d36ae003..b496d186 100644 --- a/api/proto/streamer/proto/streamer_api.go +++ b/api/proto/streamer/proto/streamer_api.go @@ -2,6 +2,7 @@ package proto import ( "service/api/base" + "service/bizcommon/util" "service/dbstruct" ) @@ -193,3 +194,25 @@ type ApiRecommListResp struct { base.BaseResponse Data *ApiRecommListData `json:"data"` } + +// api 推荐 +type ApiFilterReq struct { + base.BaseRequest + Age *util.Int64Filter `json:"age"` + Fans *util.Int64Filter `json:"fans"` + Height *util.Int64Filter `json:"height"` + Weight *util.Int64Filter `json:"weight"` + City *string `json:"city"` + Constellation *string `json:"constellation"` + IsActiveWithinAWeek *int64 `json:"is_active_within_a_week"` +} + +type ApiFilterData struct { + StreamerList []*ApiListExtVO `json:"streamer_list"` + RecommList []*ApiListExtVO `json:"recomm_list"` +} + +type ApiFilterResp struct { + base.BaseResponse + Data *ApiFilterData `json:"data"` +} diff --git a/api/proto/streamer_acct/proto/streamer_acct_op.go b/api/proto/streamer_acct/proto/streamer_acct_op.go index 85d62830..89b034d3 100644 --- a/api/proto/streamer_acct/proto/streamer_acct_op.go +++ b/api/proto/streamer_acct/proto/streamer_acct_op.go @@ -1,6 +1,9 @@ package proto -import "service/api/base" +import ( + "service/api/base" + "service/bizcommon/util" +) // 根据UserId模糊查询(和姓名取并集) type OpListFuzzilyByUserIdReq struct { @@ -37,3 +40,27 @@ type OpListFuzzilyByNameResp struct { base.BaseResponse Data *OpListFuzzilyByNameData `json:"data"` } + +// 筛选 +type OpFilterReq struct { + base.BaseRequest + Age *util.Int64Filter `json:"age"` + Fans *util.Int64Filter `json:"fans"` + Height *util.Int64Filter `json:"height"` + Weight *util.Int64Filter `json:"weight"` + City *string `json:"city"` + Constellation *string `json:"constellation"` + IsActiveWithinAWeek *int64 `json:"is_active_within_a_week"` + Offset int `json:"offset"` + Limit int `json:"limit"` + Sort []string + BlockedFromBeingSearchedList []string +} + +type OpFilterData struct { +} + +type OpFilterResp struct { + base.BaseResponse + Data *OpListFuzzilyByNameData `json:"data"` +} diff --git a/app/mix/controller/init.go b/app/mix/controller/init.go index 540b07bd..e65448e0 100644 --- a/app/mix/controller/init.go +++ b/app/mix/controller/init.go @@ -168,6 +168,7 @@ func Init(r *gin.Engine) { apiStreamerGroup.POST("list_ext_fuzzily_by_name", middleware.JSONParamValidator(streamerproto.ApiListExtFuzzilyByNameReq{}), middleware.JwtAuthenticator(), ApiGetStreamerExtListFuzzilyByName) apiStreamerGroup.POST("list_wx_id", middleware.JSONParamValidator(streamerproto.ApiListStreamerWxIdReq{}), middleware.JwtAuthenticator(), ApiGetStreamerWxId) apiStreamerGroup.POST("recomm_list", middleware.JSONParamValidator(streamerproto.ApiRecommListReq{}), middleware.JwtAuthenticator(), ApiGetStreamerRecommList) + apiStreamerGroup.POST("filter", middleware.JSONParamValidator(streamerproto.ApiFilterReq{}), middleware.JwtAuthenticator(), ApiFilterStreamer) // 意见反馈 apiFeedbackGroup := r.Group("/api/feedback", PrepareToC()) diff --git a/app/mix/controller/streamer_api.go b/app/mix/controller/streamer_api.go index 25451200..8a439e17 100644 --- a/app/mix/controller/streamer_api.go +++ b/app/mix/controller/streamer_api.go @@ -238,3 +238,39 @@ func ApiGetStreamerRecommList(ctx *gin.Context) { } ReplyOk(ctx, data) } + +func ApiFilterStreamer(ctx *gin.Context) { + req := ctx.MustGet("client_req").(*streamerproto.ApiFilterReq) + + streamerlist, recommlist, ec := service.DefaultService.ApiFilterStreamer(ctx, req) + if ec != errcode.ErrCodeStreamerSrvOk { + logger.Error("ApiFilterStreamer fail, req: %v, ec: %v", util.ToJson(req), ec) + ReplyErrCodeMsg(ctx, ec) + return + } + + objectMediaNum := 4 // 单个主播服务总共4个媒体类 + mediaFillableList := make([]mediafiller.MediaFillable, len(streamerlist)*objectMediaNum) + for i, vo := range streamerlist { + mediaFillableList[objectMediaNum*i+0] = vo.Avatar + mediaFillableList[objectMediaNum*i+1] = vo.Cover + mediaFillableList[objectMediaNum*i+2] = vo.Shorts + mediaFillableList[objectMediaNum*i+3] = vo.Album + } + mediafiller.FillList(ctx, mediaFillableList) + + mediaFillableList = make([]mediafiller.MediaFillable, len(recommlist)*objectMediaNum) + for i, vo := range recommlist { + mediaFillableList[objectMediaNum*i+0] = vo.Avatar + mediaFillableList[objectMediaNum*i+1] = vo.Cover + mediaFillableList[objectMediaNum*i+2] = vo.Shorts + mediaFillableList[objectMediaNum*i+3] = vo.Album + } + mediafiller.FillList(ctx, mediaFillableList) + + data := &streamerproto.ApiFilterData{ + StreamerList: streamerlist, + RecommList: recommlist, + } + ReplyOk(ctx, data) +} diff --git a/app/mix/dao/elasticsearch.go b/app/mix/dao/elasticsearch.go index 9db99118..0573e060 100644 --- a/app/mix/dao/elasticsearch.go +++ b/app/mix/dao/elasticsearch.go @@ -8,6 +8,7 @@ import ( "service/dbstruct" "service/library/elasticsearchdb" "service/library/logger" + "time" "unicode" "service/api/consts" @@ -40,7 +41,7 @@ func NewElasticSearch(cfg *conf.ConfigSt) (es *ElasticSearch, err error) { } const ( - IndexStreamerAcct = "streamer_acct" + IndexStreamerAcct = "new_streamer_acct" TypeStreamerAcct = "_doc" ) @@ -66,6 +67,21 @@ func (es *ElasticSearch) UpdateStreamerAcct(ctx *gin.Context, streameracct *dbst return err } +func (es *ElasticSearch) UpdateStreamerAcctSelectively(ctx *gin.Context, streameracct *dbstruct.EsStreamerAcctUpdater) error { + _, err := es.clientMix.Update().Index(es.getIndexStreamerAcct()).Id(fmt.Sprint(streameracct.GetMid())).Doc(util.EntityToM(streameracct)).Do(ctx) + return err +} + +func (es *ElasticSearch) UpdateStreamerAcctSelectivelyByMids(ctx *gin.Context, streameracct *dbstruct.EsStreamerAcctUpdater, mids []int64) error { + mp := util.EntityToM(streameracct) + bulk := es.clientMix.Bulk().Index(es.getIndexStreamerAcct()).Refresh("true") + for _, mid := range mids { + bulk.Add(elastic.NewBulkUpdateRequest().Id(fmt.Sprint(mid)).Doc(mp)) + } + _, err := bulk.Do(ctx) + return err +} + func (es *ElasticSearch) GetStreamerAcctListFuzzilyByName(ctx *gin.Context, req *streameracctproto.OpListFuzzilyByNameReq) (list []*dbstruct.EsStreamerAcct, err error) { list = make([]*dbstruct.EsStreamerAcct, 0) @@ -157,3 +173,69 @@ func (es *ElasticSearch) GetStreamerAcctListFuzzilyByUserId(ctx *gin.Context, re return } + +func (es *ElasticSearch) FilterStreamerAcctList(ctx *gin.Context, req *streameracctproto.OpFilterReq) (list []*dbstruct.EsStreamerAcct, err error) { + + list = make([]*dbstruct.EsStreamerAcct, 0) + + delFlagClause := elastic.NewMatchQuery("del_flag", 0) + mustnotClause := elastic.NewTermsQueryFromStrings("_id", req.BlockedFromBeingSearchedList...) + + queryClauses := make([]elastic.Query, 0) + queryClauses = append(queryClauses, delFlagClause) + // 年龄 + ageClause := util.CreateEsInt64RangeQuery("age", req.Age) + if ageClause != nil { + queryClauses = append(queryClauses, ageClause) + } + // 粉丝量 + fansClause := util.CreateEsInt64RangeQuery("fans", req.Fans) + if fansClause != nil { + queryClauses = append(queryClauses, fansClause) + } + // 身高 + heightClause := util.CreateEsInt64RangeQuery("age", req.Height) + if heightClause != nil { + queryClauses = append(queryClauses, heightClause) + } + // 体重 + weightClause := util.CreateEsInt64RangeQuery("age", req.Weight) + if weightClause != nil { + queryClauses = append(queryClauses, weightClause) + } + // 城市 + if req.City != nil { + queryClauses = append(queryClauses, elastic.NewMatchQuery("city", util.DerefString(req.City))) + } + // 星座 + if req.Constellation != nil { + queryClauses = append(queryClauses, elastic.NewMatchQuery("constellation", util.DerefString(req.Constellation))) + } + // 是否7日内有更新 + if req.IsActiveWithinAWeek != nil { + isActiveWithinAWeek := util.DerefInt64(req.IsActiveWithinAWeek) + if isActiveWithinAWeek == 1 { + daystartBeforeAWeek := util.GetDayStartTimeStamp(time.Now()) - 7*86400 // 这是7日前的日始整点时间戳 + queryClauses = append(queryClauses, elastic.NewRangeQuery("last_zone_moment_create_day_start").Gte(daystartBeforeAWeek)) + } + } + + query := elastic.NewBoolQuery().Must(queryClauses...).MustNot(mustnotClause) + res, err := es.clientMix.Search(es.getIndexStreamerAcct()).Query(query).From(req.Offset).Size(req.Limit).Sort("ct", true).Do(ctx) + if err != nil { + logger.Error("Search %v fail, err: %v", IndexStreamerAcct, err) + return + } + + for _, hit := range res.Hits.Hits { + streameracct := &dbstruct.EsStreamerAcct{} + err = json.Unmarshal(hit.Source, streameracct) + if err != nil { + logger.Error("json Unmarshal fail, err: %v", err) + return + } + list = append(list, streameracct) + } + + return +} diff --git a/app/mix/dao/mongo.go b/app/mix/dao/mongo.go index 54d27643..67d4be76 100644 --- a/app/mix/dao/mongo.go +++ b/app/mix/dao/mongo.go @@ -5058,6 +5058,23 @@ func (m *Mongo) GetZidsByZoneMomentIds(ctx *gin.Context, zonemomentIds []int64) return zids, err } +func (m *Mongo) GetMidsByZoneMomentIds(ctx *gin.Context, zonemomentIds []int64) ([]int64, error) { + col := m.getColZoneMoment() + query := qmgo.M{ + "_id": qmgo.M{ + "$in": zonemomentIds, + }, + "del_flag": 0, + } + mids := make([]int64, 0) + err := col.Find(ctx, query).Distinct("mid", &mids) + if err == qmgo.ErrNoSuchDocuments { + err = nil + return make([]int64, 0), err + } + return mids, err +} + func (m *Mongo) HeadZoneMomentByIds(ctx *gin.Context, ids []int64, opType int64) error { col := m.getColZoneMoment() query := qmgo.M{ diff --git a/app/mix/service/apiservice.go b/app/mix/service/apiservice.go index 3768c48b..f6535e34 100644 --- a/app/mix/service/apiservice.go +++ b/app/mix/service/apiservice.go @@ -1447,6 +1447,19 @@ func (s *Service) ApiUpdateStreamer(ctx *gin.Context, req *streamerproto.ApiUpda logger.Error("UpdateWechat fail, mid: %v, err: %v", err) return } + // 更新es + if err := _DefaultStreamerAcct.OpUpdateSelectively(ctx, &dbstruct.EsStreamerAcctUpdater{ + Mid: req.Streamer.Mid, + Age: req.Streamer.Age, + Height: req.Streamer.Height, + Weight: req.Streamer.Weight, + City: req.Streamer.City, + Constellation: req.Streamer.Constellation, + }); err != nil { + logger.Error("OpUpdateSelectively fail, req: %v, err: %v", util.ToJson(req), err) + ec = errcode.ErrCodeStreamerSrvFail + return + } return } @@ -1765,6 +1778,58 @@ func (s *Service) ApiGetStreamerRecommList(ctx *gin.Context, req *streamerproto. return } +func (s *Service) ApiFilterStreamer(ctx *gin.Context, req *streamerproto.ApiFilterReq) (streamerlist []*streamerproto.ApiListExtVO, recommlist []*streamerproto.ApiListExtVO, ec errcode.ErrCode) { + + ec = errcode.ErrCodeStreamerSrvOk + + // 从redis中获取当前禁止被搜索的主播 + blockedlist := make([]string, 0) + err := redis.GetRedisClient().GetObject(consts.RedisStreamerPrefix+"blocked_from_being_searched_list", &blockedlist) + if err != nil && !redis.GetRedisClient().IsErrNil(err) { + logger.Error("Redis read failed : %v", err) + return + } + + //1.从主播用户表中模糊匹配所有主播信息的用户侧数据,按mid正序排序 + streameraccts, err := _DefaultStreamerAcct.OpFilterStreamerAcct(ctx, &streameracctproto.OpFilterReq{ + Age: req.Age, + Fans: req.Fans, + Height: req.Height, + Weight: req.Weight, + City: req.City, + Constellation: req.Constellation, + IsActiveWithinAWeek: req.IsActiveWithinAWeek, + Offset: 0, + Limit: 20, + Sort: []string{"_id"}, + BlockedFromBeingSearchedList: blockedlist, + }) + if err != nil { + logger.Error("Account OpFilterStreamerAcct fail, req: %v, err: %v", util.ToJson(req), err) + ec = errcode.ErrCodeAccountSrvFail + return + } + + //2.获取mids + mids := make([]int64, 0) + for _, v := range streameraccts { + mids = append(mids, v.Mid) + } + + mp, err := s.utilGetStreamerExtMapByMids(ctx, mids, consts.InterfaceType_Api) + if err != nil { + logger.Error("utilGetStreamerExtMapByMids fail, err: %v", err) + return + } + streamerlist = make([]*streamerproto.ApiListExtVO, 0) + for _, mid := range mids { + vo, _ := mp[mid].(*streamerproto.ApiListExtVO) + streamerlist = append(streamerlist, vo) + } + + return +} + // Feedback func (s *Service) ApiCreateFeedback(ctx *gin.Context, req *feedbackproto.ApiCreateReq) (ec errcode.ErrCode) { ec = errcode.ErrCodeFeedbackSrvOk @@ -3156,11 +3221,23 @@ func (s *Service) ApiDeleteZoneMoment(ctx *gin.Context, req *zonemomentproto.Api LastZoneMomentCt: goproto.Int64(0), }, }, zone.GetLastZoneMomentCt()) + if err == qmgo.ErrNoSuchDocuments { + return + } if err != nil { logger.Error("OpUpdate fail, err: %v", err) ec = errcode.ErrCodeZoneMomentSrvFail return } + err = _DefaultStreamerAcct.OpUpdateSelectively(ctx, &dbstruct.EsStreamerAcctUpdater{ + Mid: zonemoment.Mid, + LastZoneMomentCreateDayStart: goproto.Int64(0), + }) + if err != nil { + logger.Error("OpUpdateSelectively fail, err: %v", err) + ec = errcode.ErrCodeZoneMomentSrvFail + return + } } else { //还有动态,看最后一条更新的动态的更新时间是否比删除的那条动态更早,若更早,则需要把空间的更新时间回拨 lastzonemoment := list[0] @@ -3171,11 +3248,23 @@ func (s *Service) ApiDeleteZoneMoment(ctx *gin.Context, req *zonemomentproto.Api LastZoneMomentCt: lastzonemoment.Ut, }, }, zone.GetLastZoneMomentCt()) + if err == qmgo.ErrNoSuchDocuments { + return + } if err != nil { logger.Error("OpUpdate fail, err: %v", err) ec = errcode.ErrCodeZoneMomentSrvFail return } + err = _DefaultStreamerAcct.OpUpdateSelectively(ctx, &dbstruct.EsStreamerAcctUpdater{ + Mid: zonemoment.Mid, + LastZoneMomentCreateDayStart: goproto.Int64(util.GetDayStartTimeStamp(time.Unix(lastzonemoment.GetUt(), 0))), + }) + if err != nil { + logger.Error("OpUpdateSelectively fail, err: %v", err) + ec = errcode.ErrCodeZoneMomentSrvFail + return + } } } } diff --git a/app/mix/service/logic/streamer_acct.go b/app/mix/service/logic/streamer_acct.go index 68aa0699..70959ea5 100644 --- a/app/mix/service/logic/streamer_acct.go +++ b/app/mix/service/logic/streamer_acct.go @@ -39,6 +39,24 @@ func (p *StreamerAcct) OpUpdate(ctx *gin.Context, streameracct *dbstruct.EsStrea return nil } +func (p *StreamerAcct) OpUpdateSelectively(ctx *gin.Context, streameracct *dbstruct.EsStreamerAcctUpdater) error { + err := p.store.UpdateStreamerAcctSelectively(ctx, streameracct) + if err != nil { + logger.Error("UpdateStreamerAcctSelectively fail, err: %v", err) + return err + } + return nil +} + +func (p *StreamerAcct) OpUpdateSelectivelyByMids(ctx *gin.Context, streameracct *dbstruct.EsStreamerAcctUpdater, mids []int64) error { + err := p.store.UpdateStreamerAcctSelectivelyByMids(ctx, streameracct, mids) + if err != nil { + logger.Error("UpdateStreamerAcctSelectivelyByMids fail, err: %v", err) + return err + } + return nil +} + func (p *StreamerAcct) OpListStreamerAcctFuzzilyByUserId(ctx *gin.Context, req *streameracctproto.OpListFuzzilyByUserIdReq) ([]*dbstruct.EsStreamerAcct, error) { list, err := p.store.GetStreamerAcctListFuzzilyByUserId(ctx, req) if err != nil { @@ -56,3 +74,12 @@ func (p *StreamerAcct) OpListStreamerAcctFuzzilyByName(ctx *gin.Context, req *st } return list, nil } + +func (p *StreamerAcct) OpFilterStreamerAcct(ctx *gin.Context, req *streameracctproto.OpFilterReq) ([]*dbstruct.EsStreamerAcct, error) { + list, err := p.store.FilterStreamerAcctList(ctx, req) + if err != nil { + logger.Error("FilterStreamerAcctList fail, err: %v", err) + return nil, err + } + return list, nil +} diff --git a/app/mix/service/logic/zonemoment.go b/app/mix/service/logic/zonemoment.go index 9810ef7c..499d82d2 100644 --- a/app/mix/service/logic/zonemoment.go +++ b/app/mix/service/logic/zonemoment.go @@ -170,6 +170,15 @@ func (p *ZoneMoment) GetZidsByZoneMomentIds(ctx *gin.Context, zonemomentIds []in return zids, err } +func (p *ZoneMoment) GetMidsByZoneMomentIds(ctx *gin.Context, zonemomentIds []int64) ([]int64, error) { + mids, err := p.store.GetMidsByZoneMomentIds(ctx, zonemomentIds) + if err != nil { + logger.Error("GetMidsByZoneMomentIds fail, err: %v", err) + return nil, err + } + return mids, err +} + func (p *ZoneMoment) OpHeadByIds(ctx *gin.Context, ids []int64, opType int64) error { err := p.store.HeadZoneMomentByIds(ctx, ids, opType) if err != nil { diff --git a/app/mix/service/service.go b/app/mix/service/service.go index 591f3ad6..1888d466 100644 --- a/app/mix/service/service.go +++ b/app/mix/service/service.go @@ -1878,24 +1878,29 @@ func (s *Service) OpApproveStreamerAuthApproval(ctx *gin.Context, req *streamera } // 主播账户表新增数据 - accountList, err := _DefaultAccount.OpListByMids(ctx, &accountproto.OpListByMidsReq{ - Mids: midList, - }) + accountMp, err := _DefaultAccount.GetAccountMapByMids(ctx, midList) if err != nil { logger.Error("_DefaultAccount OpListByMids fail, req: %v, err: %v", util.ToJson(req), err) ec = errcode.ErrCodeAccountSrvFail return } streamerAccts := make([]*dbstruct.EsStreamerAcct, 0) - for _, acct := range accountList { + for _, streamer := range streamerList { + acct := accountMp[streamer.GetMid()] streamerAccts = append(streamerAccts, &dbstruct.EsStreamerAcct{ - Mid: acct.GetMid(), - Name: acct.GetName(), - UserIdString: acct.GetUserIdString(), - PinYin: strings.Join(pinyin.LazyConvert(acct.GetName(), nil), ""), - Ct: acct.GetCt(), - Ut: acct.GetUt(), - DelFlag: acct.GetDelFlag(), + Mid: acct.GetMid(), + Name: acct.GetName(), + UserIdString: acct.GetUserIdString(), + PinYin: strings.Join(pinyin.LazyConvert(acct.GetName(), nil), ""), + Age: streamer.GetAge(), + Height: streamer.GetHeight(), + Weight: streamer.GetWeight(), + City: streamer.GetCity(), + Constellation: streamer.GetConstellation(), + LastZoneMomentCreateDayStart: 0, + Ct: acct.GetCt(), + Ut: acct.GetUt(), + DelFlag: acct.GetDelFlag(), }) } err = _DefaultStreamerAcct.OpCreate(ctx, streamerAccts) @@ -2170,31 +2175,19 @@ func (s *Service) OpApproveStreamerAuthApprovalDetails(ctx *gin.Context, req *st return } } - accounts, err := _DefaultAccount.OpListByMids(ctx, &accountproto.OpListByMidsReq{ - Mids: mids, - }) + accountMp, err := _DefaultAccount.GetAccountMapByMids(ctx, mids) if err != nil { logger.Error("_DefaultAccount OpListByMids fail, req: %v, err: %v", util.ToJson(req), err) ec = errcode.ErrCodeAccountSrvFail return } - // 预填写主播账户表数据 - streamerAccts := make([]*dbstruct.EsStreamerAcct, 0) - for _, acct := range accounts { + // 判断主播是否是准主播 + for _, acct := range accountMp { if acct.GetRole() != consts.StreamerToBe { logger.Error("mid: %v is not a streamer-to-be, req: %v", acct.GetMid(), util.ToJson(req)) ec = errcode.ErrCodeStreamerAuthApprovalDetailsNotAStreamerToBe return } - streamerAccts = append(streamerAccts, &dbstruct.EsStreamerAcct{ - Mid: acct.GetMid(), - Name: acct.GetName(), - UserIdString: acct.GetUserIdString(), - PinYin: strings.Join(pinyin.LazyConvert(acct.GetName(), nil), ""), - Ct: acct.GetCt(), - Ut: acct.GetUt(), - DelFlag: acct.GetDelFlag(), - }) } //2.更新状态 @@ -2231,6 +2224,8 @@ func (s *Service) OpApproveStreamerAuthApprovalDetails(ctx *gin.Context, req *st //3.若是审批通过,将审批表的信息合并进主播表,并更新account表里的角色信息主播 if util.DerefInt64(req.ApproveStatus) == consts.StreamerAuthApprovalDetailsApprove_Passed { + + streamerAccts := make([]*dbstruct.EsStreamerAcct, 0) for _, streamerAuthApprovalDetails := range list { streamer := &dbstruct.Streamer{ Mid: streamerAuthApprovalDetails.Mid, @@ -2250,6 +2245,24 @@ func (s *Service) OpApproveStreamerAuthApprovalDetails(ctx *gin.Context, req *st IsHided: goproto.Int64(0), } + // es搜索引擎数据 + acct := accountMp[streamer.GetMid()] + streamerAccts = append(streamerAccts, &dbstruct.EsStreamerAcct{ + Mid: acct.GetMid(), + Name: acct.GetName(), + UserIdString: acct.GetUserIdString(), + PinYin: strings.Join(pinyin.LazyConvert(acct.GetName(), nil), ""), + Age: streamer.GetAge(), + Height: streamer.GetHeight(), + Weight: streamer.GetWeight(), + City: streamer.GetCity(), + Constellation: streamer.GetConstellation(), + LastZoneMomentCreateDayStart: 0, + Ct: acct.GetCt(), + Ut: acct.GetUt(), + DelFlag: acct.GetDelFlag(), + }) + // 更新user_vas err = _DefaultVas.UpdateWechat(ctx, &vasproto.UpdateWechatReq{ BaseRequest: base.BaseRequest{ @@ -2356,6 +2369,19 @@ func (s *Service) OpUpdateStreamer(ctx *gin.Context, req *streamerproto.OpUpdate ec = errcode.ErrCodeStreamerSrvFail return } + // 更新es + if err := _DefaultStreamerAcct.OpUpdateSelectively(ctx, &dbstruct.EsStreamerAcctUpdater{ + Mid: req.Streamer.Mid, + Age: req.Streamer.Age, + Height: req.Streamer.Height, + Weight: req.Streamer.Weight, + City: req.Streamer.City, + Constellation: req.Streamer.Constellation, + }); err != nil { + logger.Error("OpUpdateSelectively fail, req: %v, err: %v", util.ToJson(req), err) + ec = errcode.ErrCodeStreamerSrvFail + return + } return } @@ -4164,11 +4190,23 @@ func (s *Service) OpDeleteZoneMoment(ctx *gin.Context, req *zonemomentproto.OpDe LastZoneMomentCt: goproto.Int64(0), }, }, zone.GetLastZoneMomentCt()) + if err == qmgo.ErrNoSuchDocuments { + return + } if err != nil { logger.Error("OpUpdate fail, err: %v", err) ec = errcode.ErrCodeZoneMomentSrvFail return } + err = _DefaultStreamerAcct.OpUpdateSelectively(ctx, &dbstruct.EsStreamerAcctUpdater{ + Mid: zonemoment.Mid, + LastZoneMomentCreateDayStart: goproto.Int64(0), + }) + if err != nil { + logger.Error("OpUpdateSelectively fail, err: %v", err) + ec = errcode.ErrCodeZoneMomentSrvFail + return + } } else { //还有动态,看最后一条更新的动态的更新时间是否比删除的那条动态更早,若更早,则需要把空间的更新时间回拨 lastzonemoment := list[0] @@ -4179,11 +4217,23 @@ func (s *Service) OpDeleteZoneMoment(ctx *gin.Context, req *zonemomentproto.OpDe LastZoneMomentCt: lastzonemoment.Ut, }, }, zone.GetLastZoneMomentCt()) + if err == qmgo.ErrNoSuchDocuments { + return + } if err != nil { logger.Error("OpUpdate fail, err: %v", err) ec = errcode.ErrCodeZoneMomentSrvFail return } + err = _DefaultStreamerAcct.OpUpdateSelectively(ctx, &dbstruct.EsStreamerAcctUpdater{ + Mid: zonemoment.Mid, + LastZoneMomentCreateDayStart: lastzonemoment.Ut, + }) + if err != nil { + logger.Error("OpUpdateSelectively fail, err: %v", err) + ec = errcode.ErrCodeZoneMomentSrvFail + return + } } } } @@ -4301,6 +4351,21 @@ func (s *Service) OpReviewZoneMoment(ctx *gin.Context, req *zonemomentproto.OpRe ec = errcode.ErrCodeZoneMomentSrvFail return } + daystart := util.GetDayStartTimeStamp(time.Unix(time.Now().Unix(), 0)) + mids, err := _DefaultZoneMoment.GetMidsByZoneMomentIds(ctx, req.ZoneMomentIds) + if err != nil { + logger.Error("_DefaultZoneMoment GetMidsByZoneMomentIds fail, req: %v, err: %v", util.ToJson(req), err) + ec = errcode.ErrCodeZoneMomentSrvFail + return + } + err = _DefaultStreamerAcct.OpUpdateSelectivelyByMids(ctx, &dbstruct.EsStreamerAcctUpdater{ + LastZoneMomentCreateDayStart: goproto.Int64(daystart), + }, mids) + if err != nil { + logger.Error("_DefaultStreamerAcct OpUpdateSelectivelyByMids fail, req: %v, err: %v", util.ToJson(req), err) + ec = errcode.ErrCodeZoneMomentSrvFail + return + } // 增加空间内统计总数 for _, id := range req.ZoneMomentIds { diff --git a/bizcommon/util/util.go b/bizcommon/util/util.go index d8899cbb..1d54396c 100644 --- a/bizcommon/util/util.go +++ b/bizcommon/util/util.go @@ -16,6 +16,7 @@ import ( "time" "unsafe" + "github.com/olivere/elastic/v7" "github.com/qiniu/qmgo" ) @@ -373,3 +374,22 @@ func GetLastLessOrEqualForFloat64(arr []float64, target float64) int { return result } + +type Int64Filter struct { + LowerBound *int64 `json:"lower_bound" bson:"lower_bound"` + UpperBound *int64 `json:"upper_bound" bson:"upper_bound"` +} + +func CreateEsInt64RangeQuery(name string, filter *Int64Filter) *elastic.RangeQuery { + if filter == nil { + return nil + } + query := elastic.NewRangeQuery(name) + if filter.LowerBound != nil { + query.Gte(DerefInt64(filter.LowerBound)) + } + if filter.UpperBound != nil { + query.Lte(DerefInt64(filter.UpperBound)) + } + return query +} diff --git a/dbstruct/streamer.go b/dbstruct/streamer.go index 7441cb78..69907dfc 100644 --- a/dbstruct/streamer.go +++ b/dbstruct/streamer.go @@ -57,3 +57,35 @@ func (p *Streamer) GetAutoResponseMessage() string { } return *p.AutoResponseMessage } + +func (p *Streamer) GetAge() int64 { + if p == nil || p.Age == nil { + return 0 + } + return *p.Age +} + +func (p *Streamer) GetHeight() int64 { + if p == nil || p.Height == nil { + return 0 + } + return *p.Height +} +func (p *Streamer) GetWeight() int64 { + if p == nil || p.Weight == nil { + return 0 + } + return *p.Weight +} +func (p *Streamer) GetConstellation() string { + if p == nil || p.Constellation == nil { + return "" + } + return *p.Constellation +} +func (p *Streamer) GetCity() string { + if p == nil || p.City == nil { + return "" + } + return *p.City +} diff --git a/dbstruct/streamer_acct.go b/dbstruct/streamer_acct.go index 10a1bd9a..de502b27 100644 --- a/dbstruct/streamer_acct.go +++ b/dbstruct/streamer_acct.go @@ -1,11 +1,37 @@ package dbstruct type EsStreamerAcct struct { - Mid int64 `json:"id"` // 用户表Id - Name string `json:"name"` // 用户名 - UserIdString string `json:"user_id_string"` // string型user_id,为模糊匹配设置 - PinYin string `json:"pinyin"` // 拼音 - Ct int64 `json:"ct"` // 创建时间 - Ut int64 `json:"ut"` // 更新时间 - DelFlag int64 `json:"del_flag"` // 删除标记,0-否,1-是 + Mid int64 `json:"id"` // 用户表Id + Name string `json:"name"` // 用户名 + UserIdString string `json:"user_id_string"` // string型user_id,为模糊匹配设置 + PinYin string `json:"pinyin"` // 拼音 + Age int64 `json:"age"` // 年龄 + Height int64 `json:"height"` // 身高 + Weight int64 `json:"weight"` // 体重 + City string `json:"city"` // 所在城市 + Constellation string `json:"constellation"` // 星座 + LastZoneMomentCreateDayStart int64 `json:"last_zone_moment_create_day_start"` // 最后空间动态创建日始整点 + Ct int64 `json:"ct"` // 创建时间 + Ut int64 `json:"ut"` // 更新时间 + DelFlag int64 `json:"del_flag"` // 删除标记,0-否,1-是 +} + +type EsStreamerAcctUpdater struct { + Mid *int64 `json:"id" bson:"_id"` // 用户表Id + Name *string `json:"name" bson:"name"` // 用户名 + UserIdString *string `json:"user_id_string" bson:"user_id_string"` // string型user_id,为模糊匹配设置 + PinYin *string `json:"pinyin" bson:"pinyin"` // 拼音 + Age *int64 `json:"age" bson:"age"` // 年龄 + Height *int64 `json:"height" bson:"height"` // 身高 + Weight *int64 `json:"weight" bson:"weight"` // 体重 + City *string `json:"city" bson:"city"` // 所在城市 + Constellation *string `json:"constellation" bson:"constellation"` // 星座 + LastZoneMomentCreateDayStart *int64 `json:"last_zone_moment_create_day_start" bson:"last_zone_moment_create_day_start"` // 最后空间动态创建日始整点 +} + +func (p *EsStreamerAcctUpdater) GetMid() int64 { + if p == nil || p.Mid == nil { + return 0 + } + return *p.Mid } diff --git a/library/taginterceptor/crypt_test.go b/library/taginterceptor/crypt_test.go index 4f10daf0..6a46bbd8 100644 --- a/library/taginterceptor/crypt_test.go +++ b/library/taginterceptor/crypt_test.go @@ -32,7 +32,7 @@ func Test(t *testing.T) { // fmt.Println(string(phone)) // hash := mycrypto.CryptoServiceInstance().SHA256.Encrypt(phone) // fmt.Println(string(hash)) - mobilePhone := "13728423678" + mobilePhone := "15051772749" rsaBytes, _ := mycrypto.CryptoServiceInstance().AES.Encrypt([]byte(mobilePhone)) base64EncryptedBytes := make([]byte, base64.StdEncoding.EncodedLen(len(rsaBytes))) base64.StdEncoding.Encode(base64EncryptedBytes, rsaBytes) -- 2.41.0 From d4cff8cc016d2c493ced861102fd93aa7dc3e9f6 Mon Sep 17 00:00:00 2001 From: Leufolium Date: Fri, 16 Aug 2024 06:14:55 +0800 Subject: [PATCH 02/11] by Robin at 20240816 --- api/consts/consts.go | 1 + .../streamer_acct/proto/streamer_acct_op.go | 2 + apollostruct/streamer_filter.go | 13 +++ app/mix/dao/elasticsearch.go | 76 ++++++++----- app/mix/dao/mongo.go | 34 +++--- app/mix/service/apiservice.go | 54 +++------- app/mix/service/logic/es_decorated.go | 100 ++++++++++++++++++ app/mix/service/logic/zone.go | 9 ++ app/mix/service/logic/zonemoment.go | 9 -- app/mix/service/service.go | 71 +++---------- bizcommon/util/util.go | 30 +++++- 11 files changed, 246 insertions(+), 153 deletions(-) create mode 100644 apollostruct/streamer_filter.go create mode 100644 app/mix/service/logic/es_decorated.go diff --git a/api/consts/consts.go b/api/consts/consts.go index edd30aca..4aa18397 100644 --- a/api/consts/consts.go +++ b/api/consts/consts.go @@ -65,6 +65,7 @@ const ( ClassResultMapKey = "class_result_map" DefaultZoneTextKey = "default_zone_text" AuditTaskCollectionReflectKey = "audit_task_collection_reflect" + StreamerFilterKey = "streamer_filter" ) // del_flag diff --git a/api/proto/streamer_acct/proto/streamer_acct_op.go b/api/proto/streamer_acct/proto/streamer_acct_op.go index 89b034d3..4d714be0 100644 --- a/api/proto/streamer_acct/proto/streamer_acct_op.go +++ b/api/proto/streamer_acct/proto/streamer_acct_op.go @@ -2,6 +2,7 @@ package proto import ( "service/api/base" + "service/apollostruct" "service/bizcommon/util" ) @@ -55,6 +56,7 @@ type OpFilterReq struct { Limit int `json:"limit"` Sort []string BlockedFromBeingSearchedList []string + RefStreamerFilter *apollostruct.StreamerFilterCfg } type OpFilterData struct { diff --git a/apollostruct/streamer_filter.go b/apollostruct/streamer_filter.go new file mode 100644 index 00000000..bc38d979 --- /dev/null +++ b/apollostruct/streamer_filter.go @@ -0,0 +1,13 @@ +package apollostruct + +import "service/bizcommon/util" + +type StreamerFilterCfg struct { + Age *util.Int64Filter `json:"age"` + Fans *util.Int64Filter `json:"fans"` + Height *util.Int64Filter `json:"height"` + Weight *util.Int64Filter `json:"weight"` + CityWeight float64 `json:"city_weight"` + ConstellationWeight float64 `json:"constellation_weight"` + IsActiveWithinAWeekWeight float64 `json:"is_active_within_a_week_weight"` +} diff --git a/app/mix/dao/elasticsearch.go b/app/mix/dao/elasticsearch.go index 0573e060..c60ebde0 100644 --- a/app/mix/dao/elasticsearch.go +++ b/app/mix/dao/elasticsearch.go @@ -174,54 +174,75 @@ func (es *ElasticSearch) GetStreamerAcctListFuzzilyByUserId(ctx *gin.Context, re return } -func (es *ElasticSearch) FilterStreamerAcctList(ctx *gin.Context, req *streameracctproto.OpFilterReq) (list []*dbstruct.EsStreamerAcct, err error) { +func (es *ElasticSearch) FilterStreamerAcctList(ctx *gin.Context, req *streameracctproto.OpFilterReq) (list []*dbstruct.EsStreamerAcct, scorelist []float64, fullscore int64, err error) { list = make([]*dbstruct.EsStreamerAcct, 0) + scorelist = make([]float64, 0) + fullscore = 0 + // 查询范围 delFlagClause := elastic.NewMatchQuery("del_flag", 0) mustnotClause := elastic.NewTermsQueryFromStrings("_id", req.BlockedFromBeingSearchedList...) + condition := elastic.NewBoolQuery().Must(delFlagClause).MustNot(mustnotClause).Boost(1.0) - queryClauses := make([]elastic.Query, 0) - queryClauses = append(queryClauses, delFlagClause) - // 年龄 - ageClause := util.CreateEsInt64RangeQuery("age", req.Age) - if ageClause != nil { - queryClauses = append(queryClauses, ageClause) + // 建立得分机制 + funcScoreQuery := elastic.NewFunctionScoreQuery().Query(condition) + funcScoreQuery.ScoreMode("sum") + funcScoreQuery.BoostMode("replace") + funcScoreQuery.MinScore(0) + funcScoreQuery.MaxBoost(3.4028235e38) + funcScoreQuery.Boost(1.0) + + // 写入关于年龄、粉丝量、身高、体重的高斯衰减打分 + matchall := elastic.NewMatchAllQuery().Boost(1.0) + agefunc := util.CreateGaussDecayFunction("age", req.Age, req.RefStreamerFilter.Age) + fansfunc := util.CreateGaussDecayFunction("fans", req.Fans, req.RefStreamerFilter.Fans) + heightfunc := util.CreateGaussDecayFunction("height", req.Height, req.RefStreamerFilter.Height) + weightfunc := util.CreateGaussDecayFunction("weight", req.Weight, req.RefStreamerFilter.Weight) + + if agefunc != nil { + funcScoreQuery.Add(matchall, agefunc) + fullscore += 1.0 } - // 粉丝量 - fansClause := util.CreateEsInt64RangeQuery("fans", req.Fans) - if fansClause != nil { - queryClauses = append(queryClauses, fansClause) + if fansfunc != nil { + funcScoreQuery.Add(matchall, fansfunc) + fullscore += 1.0 } - // 身高 - heightClause := util.CreateEsInt64RangeQuery("age", req.Height) - if heightClause != nil { - queryClauses = append(queryClauses, heightClause) + if heightfunc != nil { + funcScoreQuery.Add(matchall, heightfunc) + fullscore += 1.0 } - // 体重 - weightClause := util.CreateEsInt64RangeQuery("age", req.Weight) - if weightClause != nil { - queryClauses = append(queryClauses, weightClause) + if weightfunc != nil { + funcScoreQuery.Add(matchall, weightfunc) + fullscore += 1.0 } - // 城市 + + // 写入关于城市、星座的脚本打分 if req.City != nil { - queryClauses = append(queryClauses, elastic.NewMatchQuery("city", util.DerefString(req.City))) + funcScoreQuery.AddScoreFunc(util.CreateKeywordScriptScoreFunction("city", util.DerefString(req.City), req.RefStreamerFilter.CityWeight)) + fullscore += 1.0 } - // 星座 if req.Constellation != nil { - queryClauses = append(queryClauses, elastic.NewMatchQuery("constellation", util.DerefString(req.Constellation))) + funcScoreQuery.AddScoreFunc(util.CreateKeywordScriptScoreFunction("constellation", util.DerefString(req.Constellation), req.RefStreamerFilter.ConstellationWeight)) + fullscore += 1.0 } - // 是否7日内有更新 + + // 写入关于是否7日内有更新的脚本打分 if req.IsActiveWithinAWeek != nil { isActiveWithinAWeek := util.DerefInt64(req.IsActiveWithinAWeek) if isActiveWithinAWeek == 1 { daystartBeforeAWeek := util.GetDayStartTimeStamp(time.Now()) - 7*86400 // 这是7日前的日始整点时间戳 - queryClauses = append(queryClauses, elastic.NewRangeQuery("last_zone_moment_create_day_start").Gte(daystartBeforeAWeek)) + scriptStr := "doc['%s'].value >= %d ? %f : %f" + script := elastic.NewScript(fmt.Sprintf(scriptStr, "last_zone_moment_create_day_start", daystartBeforeAWeek, 1.0, 0.0)) + funcScoreQuery.AddScoreFunc(elastic.NewScriptFunction(script).Weight(req.RefStreamerFilter.IsActiveWithinAWeekWeight)) + fullscore += 1.0 } } - query := elastic.NewBoolQuery().Must(queryClauses...).MustNot(mustnotClause) - res, err := es.clientMix.Search(es.getIndexStreamerAcct()).Query(query).From(req.Offset).Size(req.Limit).Sort("ct", true).Do(ctx) + // 按得分倒序排列 + sortClause := elastic.NewScoreSort().Desc() + + res, err := es.clientMix.Search(es.getIndexStreamerAcct()).Query(funcScoreQuery).From(req.Offset).Size(req.Limit).SortBy(sortClause).Do(ctx) if err != nil { logger.Error("Search %v fail, err: %v", IndexStreamerAcct, err) return @@ -235,6 +256,7 @@ func (es *ElasticSearch) FilterStreamerAcctList(ctx *gin.Context, req *streamera return } list = append(list, streameracct) + scorelist = append(scorelist, util.DerefFloat64(hit.Score)) } return diff --git a/app/mix/dao/mongo.go b/app/mix/dao/mongo.go index 67d4be76..fc7a69df 100644 --- a/app/mix/dao/mongo.go +++ b/app/mix/dao/mongo.go @@ -4717,6 +4717,23 @@ func (m *Mongo) GetZoneByMid(ctx *gin.Context, mid int64) (*dbstruct.Zone, error return zone, err } +func (m *Mongo) GetMidsByZids(ctx *gin.Context, zids []int64) ([]int64, error) { + col := m.getColZone() + query := qmgo.M{ + "_id": qmgo.M{ + "$in": zids, + }, + "del_flag": 0, + } + mids := make([]int64, 0) + err := col.Find(ctx, query).Distinct("mid", &mids) + if err == qmgo.ErrNoSuchDocuments { + err = nil + return make([]int64, 0), err + } + return mids, err +} + func (m *Mongo) RecordZoneStatisticsById(ctx *gin.Context, id int64, zoneMomentCount int64, imageCount int64, videoCount int64) error { col := m.getColZone() up := qmgo.M{ @@ -5058,23 +5075,6 @@ func (m *Mongo) GetZidsByZoneMomentIds(ctx *gin.Context, zonemomentIds []int64) return zids, err } -func (m *Mongo) GetMidsByZoneMomentIds(ctx *gin.Context, zonemomentIds []int64) ([]int64, error) { - col := m.getColZoneMoment() - query := qmgo.M{ - "_id": qmgo.M{ - "$in": zonemomentIds, - }, - "del_flag": 0, - } - mids := make([]int64, 0) - err := col.Find(ctx, query).Distinct("mid", &mids) - if err == qmgo.ErrNoSuchDocuments { - err = nil - return make([]int64, 0), err - } - return mids, err -} - func (m *Mongo) HeadZoneMomentByIds(ctx *gin.Context, ids []int64, opType int64) error { col := m.getColZoneMoment() query := qmgo.M{ diff --git a/app/mix/service/apiservice.go b/app/mix/service/apiservice.go index f6535e34..fd87e35d 100644 --- a/app/mix/service/apiservice.go +++ b/app/mix/service/apiservice.go @@ -1429,7 +1429,7 @@ func (s *Service) ApiUpdateStreamer(ctx *gin.Context, req *streamerproto.ApiUpda } // 执行更新 - if err := _DefaultStreamer.OpUpdate(ctx, updateReq); err != nil { + if err := _DefaultStreamerDecrtByEs.OpUpdate(ctx, updateReq); err != nil { logger.Error("OpUpdate fail, req: %v, err: %v", util.ToJson(req), err) ec = errcode.ErrCodeStreamerSrvFail return @@ -1447,19 +1447,6 @@ func (s *Service) ApiUpdateStreamer(ctx *gin.Context, req *streamerproto.ApiUpda logger.Error("UpdateWechat fail, mid: %v, err: %v", err) return } - // 更新es - if err := _DefaultStreamerAcct.OpUpdateSelectively(ctx, &dbstruct.EsStreamerAcctUpdater{ - Mid: req.Streamer.Mid, - Age: req.Streamer.Age, - Height: req.Streamer.Height, - Weight: req.Streamer.Weight, - City: req.Streamer.City, - Constellation: req.Streamer.Constellation, - }); err != nil { - logger.Error("OpUpdateSelectively fail, req: %v, err: %v", util.ToJson(req), err) - ec = errcode.ErrCodeStreamerSrvFail - return - } return } @@ -1790,6 +1777,14 @@ func (s *Service) ApiFilterStreamer(ctx *gin.Context, req *streamerproto.ApiFilt return } + cfg := &apollostruct.StreamerFilterCfg{} + err = apollo.GetJson(consts.StreamerFilterKey, cfg, apollo.ApolloOpts().SetNamespace("application")) + if err != nil { + logger.Error("Apollo read failed : %v", err) + ec = errcode.ErrCodeApolloReadFail + return + } + //1.从主播用户表中模糊匹配所有主播信息的用户侧数据,按mid正序排序 streameraccts, err := _DefaultStreamerAcct.OpFilterStreamerAcct(ctx, &streameracctproto.OpFilterReq{ Age: req.Age, @@ -1803,6 +1798,7 @@ func (s *Service) ApiFilterStreamer(ctx *gin.Context, req *streamerproto.ApiFilt Limit: 20, Sort: []string{"_id"}, BlockedFromBeingSearchedList: blockedlist, + RefStreamerFilter: cfg, }) if err != nil { logger.Error("Account OpFilterStreamerAcct fail, req: %v, err: %v", util.ToJson(req), err) @@ -3215,56 +3211,34 @@ func (s *Service) ApiDeleteZoneMoment(ctx *gin.Context, req *zonemomentproto.Api // 已经没有动态,最后更新时间回退到0 if len(list) == 0 { - err = _DefaultZone.OpUpdateByIdAndLastZoneMomentCt(ctx, &zoneproto.OpUpdateReq{ + err = _DefaultZoneDecrtByEs.OpUpdateByIdAndLastZoneMomentCt(ctx, &zoneproto.OpUpdateReq{ + BaseRequest: base.BaseRequest{Mid: zonemoment.GetMid()}, Zone: &dbstruct.Zone{ Id: goproto.Int64(zid), LastZoneMomentCt: goproto.Int64(0), }, }, zone.GetLastZoneMomentCt()) - if err == qmgo.ErrNoSuchDocuments { - return - } if err != nil { logger.Error("OpUpdate fail, err: %v", err) ec = errcode.ErrCodeZoneMomentSrvFail return } - err = _DefaultStreamerAcct.OpUpdateSelectively(ctx, &dbstruct.EsStreamerAcctUpdater{ - Mid: zonemoment.Mid, - LastZoneMomentCreateDayStart: goproto.Int64(0), - }) - if err != nil { - logger.Error("OpUpdateSelectively fail, err: %v", err) - ec = errcode.ErrCodeZoneMomentSrvFail - return - } } else { //还有动态,看最后一条更新的动态的更新时间是否比删除的那条动态更早,若更早,则需要把空间的更新时间回拨 lastzonemoment := list[0] if lastzonemoment.GetUt() < zonemoment.GetUt() { - err = _DefaultZone.OpUpdateByIdAndLastZoneMomentCt(ctx, &zoneproto.OpUpdateReq{ + err = _DefaultZoneDecrtByEs.OpUpdateByIdAndLastZoneMomentCt(ctx, &zoneproto.OpUpdateReq{ + BaseRequest: base.BaseRequest{Mid: zonemoment.GetMid()}, Zone: &dbstruct.Zone{ Id: goproto.Int64(zid), LastZoneMomentCt: lastzonemoment.Ut, }, }, zone.GetLastZoneMomentCt()) - if err == qmgo.ErrNoSuchDocuments { - return - } if err != nil { logger.Error("OpUpdate fail, err: %v", err) ec = errcode.ErrCodeZoneMomentSrvFail return } - err = _DefaultStreamerAcct.OpUpdateSelectively(ctx, &dbstruct.EsStreamerAcctUpdater{ - Mid: zonemoment.Mid, - LastZoneMomentCreateDayStart: goproto.Int64(util.GetDayStartTimeStamp(time.Unix(lastzonemoment.GetUt(), 0))), - }) - if err != nil { - logger.Error("OpUpdateSelectively fail, err: %v", err) - ec = errcode.ErrCodeZoneMomentSrvFail - return - } } } } diff --git a/app/mix/service/logic/es_decorated.go b/app/mix/service/logic/es_decorated.go new file mode 100644 index 00000000..213da9c6 --- /dev/null +++ b/app/mix/service/logic/es_decorated.go @@ -0,0 +1,100 @@ +package logic + +import ( + streamerproto "service/api/proto/streamer/proto" + zoneproto "service/api/proto/zone/proto" + "service/bizcommon/util" + "service/dbstruct" + "service/library/logger" + "time" + + "github.com/gin-gonic/gin" + "github.com/qiniu/qmgo" + goproto "google.golang.org/protobuf/proto" +) + +type StreamerDecrtByEs struct { + StreamerAcct *StreamerAcct + Streamer *Streamer +} + +func NewStreamerDecrtByEs(streamerAcct *StreamerAcct, streamer *Streamer) *StreamerDecrtByEs { + return &StreamerDecrtByEs{ + StreamerAcct: streamerAcct, + Streamer: streamer, + } +} + +func (p *StreamerDecrtByEs) OpUpdate(ctx *gin.Context, req *streamerproto.OpUpdateReq) error { + if err := p.Streamer.OpUpdate(ctx, req); err != nil { + logger.Error("OpUpdate fail, req: %v, err: %v", util.ToJson(req), err) + return err + } + // 更新es + if err := p.StreamerAcct.OpUpdateSelectively(ctx, &dbstruct.EsStreamerAcctUpdater{ + Mid: req.Streamer.Mid, + Age: req.Streamer.Age, + Height: req.Streamer.Height, + Weight: req.Streamer.Weight, + City: req.Streamer.City, + Constellation: req.Streamer.Constellation, + }); err != nil { + logger.Error("OpUpdateSelectively fail, req: %v, err: %v", util.ToJson(req), err) + return err + } + return nil +} + +type ZoneDecrtByEs struct { + StreamerAcct *StreamerAcct + Zone *Zone +} + +func NewZoneDecrtByEs(streamerAcct *StreamerAcct, zone *Zone) *ZoneDecrtByEs { + return &ZoneDecrtByEs{ + StreamerAcct: streamerAcct, + Zone: zone, + } +} + +func (p *ZoneDecrtByEs) OpUpdateByIds(ctx *gin.Context, req *zoneproto.OpUpdateReq, ids []int64) error { + err := p.Zone.OpUpdateByIds(ctx, req, ids) + if err != nil { + logger.Error("_DefaultZone OpUpdateByIds fail, req: %v, err: %v", util.ToJson(req), err) + return err + } + daystart := util.GetDayStartTimeStamp(time.Unix(req.Zone.GetLastZoneMomentCt(), 0)) + mids, err := p.Zone.GetMidsByZids(ctx, ids) + if err != nil { + logger.Error("_DefaultZone GetMidsByZids fail, req: %v, err: %v", util.ToJson(req), err) + return err + } + err = p.StreamerAcct.OpUpdateSelectivelyByMids(ctx, &dbstruct.EsStreamerAcctUpdater{ + LastZoneMomentCreateDayStart: goproto.Int64(daystart), + }, mids) + if err != nil { + logger.Error("_DefaultStreamerAcct OpUpdateSelectivelyByMids fail, req: %v, err: %v", util.ToJson(req), err) + return err + } + return nil +} + +func (p *ZoneDecrtByEs) OpUpdateByIdAndLastZoneMomentCt(ctx *gin.Context, req *zoneproto.OpUpdateReq, lastZoneMomentCt int64) error { + err := p.Zone.OpUpdateByIdAndLastZoneMomentCt(ctx, req, lastZoneMomentCt) + if err == qmgo.ErrNoSuchDocuments { + return nil + } + if err != nil { + logger.Error("OpUpdate fail, err: %v", err) + return err + } + err = p.StreamerAcct.OpUpdateSelectively(ctx, &dbstruct.EsStreamerAcctUpdater{ + Mid: goproto.Int64(req.BaseRequest.Mid), + LastZoneMomentCreateDayStart: goproto.Int64(0), + }) + if err != nil { + logger.Error("OpUpdateSelectively fail, err: %v", err) + return err + } + return nil +} diff --git a/app/mix/service/logic/zone.go b/app/mix/service/logic/zone.go index e78037ce..98d5314f 100644 --- a/app/mix/service/logic/zone.go +++ b/app/mix/service/logic/zone.go @@ -166,3 +166,12 @@ func (p *Zone) GetZoneMapByMids(ctx *gin.Context, mids []int64) (map[int64][]*db } return _map, nil } + +func (p *Zone) GetMidsByZids(ctx *gin.Context, zids []int64) ([]int64, error) { + mids, err := p.store.GetMidsByZids(ctx, zids) + if err != nil { + logger.Error("GetMidsByZids fail, err: %v", err) + return nil, err + } + return mids, err +} diff --git a/app/mix/service/logic/zonemoment.go b/app/mix/service/logic/zonemoment.go index 499d82d2..9810ef7c 100644 --- a/app/mix/service/logic/zonemoment.go +++ b/app/mix/service/logic/zonemoment.go @@ -170,15 +170,6 @@ func (p *ZoneMoment) GetZidsByZoneMomentIds(ctx *gin.Context, zonemomentIds []in return zids, err } -func (p *ZoneMoment) GetMidsByZoneMomentIds(ctx *gin.Context, zonemomentIds []int64) ([]int64, error) { - mids, err := p.store.GetMidsByZoneMomentIds(ctx, zonemomentIds) - if err != nil { - logger.Error("GetMidsByZoneMomentIds fail, err: %v", err) - return nil, err - } - return mids, err -} - func (p *ZoneMoment) OpHeadByIds(ctx *gin.Context, ids []int64, opType int64) error { err := p.store.HeadZoneMomentByIds(ctx, ids, opType) if err != nil { diff --git a/app/mix/service/service.go b/app/mix/service/service.go index 1888d466..ebc0cf64 100644 --- a/app/mix/service/service.go +++ b/app/mix/service/service.go @@ -151,6 +151,9 @@ var ( _DefaultSingleDistributeHis *logic.SingleDistributeHis _DefaultAutoResponseCreateTimes *logic.AutoResponseCreateTimes _DefaultRavenIQTest *logic.RavenIQTest + + _DefaultStreamerDecrtByEs *logic.StreamerDecrtByEs + _DefaultZoneDecrtByEs *logic.ZoneDecrtByEs ) type Service struct { @@ -262,6 +265,9 @@ func (s *Service) Init(c any) (err error) { return nil, err }, }) + + _DefaultStreamerDecrtByEs = logic.NewStreamerDecrtByEs(_DefaultStreamerAcct, _DefaultStreamer) + _DefaultZoneDecrtByEs = logic.NewZoneDecrtByEs(_DefaultStreamerAcct, _DefaultZone) return } @@ -2344,7 +2350,7 @@ func (s *Service) OpUpdateStreamer(ctx *gin.Context, req *streamerproto.OpUpdate return } - err := _DefaultStreamer.OpUpdate(ctx, req) + err := _DefaultStreamerDecrtByEs.OpUpdate(ctx, req) if err == qmgo.ErrNoSuchDocuments { ec = errcode.ErrCodeStreamerNotExist err = nil @@ -2369,19 +2375,6 @@ func (s *Service) OpUpdateStreamer(ctx *gin.Context, req *streamerproto.OpUpdate ec = errcode.ErrCodeStreamerSrvFail return } - // 更新es - if err := _DefaultStreamerAcct.OpUpdateSelectively(ctx, &dbstruct.EsStreamerAcctUpdater{ - Mid: req.Streamer.Mid, - Age: req.Streamer.Age, - Height: req.Streamer.Height, - Weight: req.Streamer.Weight, - City: req.Streamer.City, - Constellation: req.Streamer.Constellation, - }); err != nil { - logger.Error("OpUpdateSelectively fail, req: %v, err: %v", util.ToJson(req), err) - ec = errcode.ErrCodeStreamerSrvFail - return - } return } @@ -4184,56 +4177,34 @@ func (s *Service) OpDeleteZoneMoment(ctx *gin.Context, req *zonemomentproto.OpDe // 已经没有动态,最后更新时间回退到0 if len(list) == 0 { - err = _DefaultZone.OpUpdateByIdAndLastZoneMomentCt(ctx, &zoneproto.OpUpdateReq{ + err = _DefaultZoneDecrtByEs.OpUpdateByIdAndLastZoneMomentCt(ctx, &zoneproto.OpUpdateReq{ + BaseRequest: base.BaseRequest{Mid: zonemoment.GetMid()}, Zone: &dbstruct.Zone{ Id: goproto.Int64(zid), LastZoneMomentCt: goproto.Int64(0), }, }, zone.GetLastZoneMomentCt()) - if err == qmgo.ErrNoSuchDocuments { - return - } if err != nil { logger.Error("OpUpdate fail, err: %v", err) ec = errcode.ErrCodeZoneMomentSrvFail return } - err = _DefaultStreamerAcct.OpUpdateSelectively(ctx, &dbstruct.EsStreamerAcctUpdater{ - Mid: zonemoment.Mid, - LastZoneMomentCreateDayStart: goproto.Int64(0), - }) - if err != nil { - logger.Error("OpUpdateSelectively fail, err: %v", err) - ec = errcode.ErrCodeZoneMomentSrvFail - return - } } else { //还有动态,看最后一条更新的动态的更新时间是否比删除的那条动态更早,若更早,则需要把空间的更新时间回拨 lastzonemoment := list[0] if lastzonemoment.GetUt() < zonemoment.GetUt() { - err = _DefaultZone.OpUpdateByIdAndLastZoneMomentCt(ctx, &zoneproto.OpUpdateReq{ + err = _DefaultZoneDecrtByEs.OpUpdateByIdAndLastZoneMomentCt(ctx, &zoneproto.OpUpdateReq{ + BaseRequest: base.BaseRequest{Mid: zonemoment.GetMid()}, Zone: &dbstruct.Zone{ Id: goproto.Int64(zid), LastZoneMomentCt: lastzonemoment.Ut, }, }, zone.GetLastZoneMomentCt()) - if err == qmgo.ErrNoSuchDocuments { - return - } if err != nil { logger.Error("OpUpdate fail, err: %v", err) ec = errcode.ErrCodeZoneMomentSrvFail return } - err = _DefaultStreamerAcct.OpUpdateSelectively(ctx, &dbstruct.EsStreamerAcctUpdater{ - Mid: zonemoment.Mid, - LastZoneMomentCreateDayStart: lastzonemoment.Ut, - }) - if err != nil { - logger.Error("OpUpdateSelectively fail, err: %v", err) - ec = errcode.ErrCodeZoneMomentSrvFail - return - } } } } @@ -4341,32 +4312,16 @@ func (s *Service) OpReviewZoneMoment(ctx *gin.Context, req *zonemomentproto.OpRe ec = errcode.ErrCodeZoneMomentSrvFail return } - err = _DefaultZone.OpUpdateByIds(ctx, &zoneproto.OpUpdateReq{ + err = _DefaultZoneDecrtByEs.OpUpdateByIds(ctx, &zoneproto.OpUpdateReq{ Zone: &dbstruct.Zone{ LastZoneMomentCt: goproto.Int64(time.Now().Unix()), }, }, zids) if err != nil { - logger.Error("_DefaultZone OpUpdateByIds fail, req: %v, err: %v", util.ToJson(req), err) + logger.Error("_DefaultZoneDecrtByEs OpUpdateByIds fail, req: %v, err: %v", util.ToJson(req), err) ec = errcode.ErrCodeZoneMomentSrvFail return } - daystart := util.GetDayStartTimeStamp(time.Unix(time.Now().Unix(), 0)) - mids, err := _DefaultZoneMoment.GetMidsByZoneMomentIds(ctx, req.ZoneMomentIds) - if err != nil { - logger.Error("_DefaultZoneMoment GetMidsByZoneMomentIds fail, req: %v, err: %v", util.ToJson(req), err) - ec = errcode.ErrCodeZoneMomentSrvFail - return - } - err = _DefaultStreamerAcct.OpUpdateSelectivelyByMids(ctx, &dbstruct.EsStreamerAcctUpdater{ - LastZoneMomentCreateDayStart: goproto.Int64(daystart), - }, mids) - if err != nil { - logger.Error("_DefaultStreamerAcct OpUpdateSelectivelyByMids fail, req: %v, err: %v", util.ToJson(req), err) - ec = errcode.ErrCodeZoneMomentSrvFail - return - } - // 增加空间内统计总数 for _, id := range req.ZoneMomentIds { zonemoment, err := _DefaultZoneMoment.GetById(ctx, id) diff --git a/bizcommon/util/util.go b/bizcommon/util/util.go index 1d54396c..54968acd 100644 --- a/bizcommon/util/util.go +++ b/bizcommon/util/util.go @@ -376,8 +376,11 @@ func GetLastLessOrEqualForFloat64(arr []float64, target float64) int { } type Int64Filter struct { - LowerBound *int64 `json:"lower_bound" bson:"lower_bound"` - UpperBound *int64 `json:"upper_bound" bson:"upper_bound"` + LowerBound *int64 `json:"lower_bound" bson:"lower_bound"` + UpperBound *int64 `json:"upper_bound" bson:"upper_bound"` + Scale int64 `json:"scale" bson:"scale"` + Decay float64 `json:"decay" bson:"decay"` + Weight float64 `json:"weight" bson:"weight"` } func CreateEsInt64RangeQuery(name string, filter *Int64Filter) *elastic.RangeQuery { @@ -393,3 +396,26 @@ func CreateEsInt64RangeQuery(name string, filter *Int64Filter) *elastic.RangeQue } return query } + +func CreateGaussDecayFunction(name string, filter *Int64Filter, refFilter *Int64Filter) *elastic.GaussDecayFunction { + if filter == nil { + return nil + } + lb := DerefInt64(filter.LowerBound) + ub := DerefInt64(filter.UpperBound) + if filter.LowerBound == nil { + lb = DerefInt64(refFilter.LowerBound) + } + if filter.UpperBound == nil { + ub = DerefInt64(refFilter.UpperBound) + } + origin := float64(lb+ub) / 2 + offset := float64(-lb+ub) / 2 + return elastic.NewGaussDecayFunction().FieldName(name).Origin(origin).Offset(offset).Scale(refFilter.Scale).Decay(refFilter.Decay).Weight(refFilter.Weight) +} + +func CreateKeywordScriptScoreFunction(name string, value string, weight float64) *elastic.ScriptFunction { + scriptStr := "doc['%s'].value == '%s' ? %f : %f" + script := elastic.NewScript(fmt.Sprintf(scriptStr, name, value, 1.0, 0.0)) + return elastic.NewScriptFunction(script).Weight(weight) +} -- 2.41.0 From 5abd6883d4302fa876fda7db81a2dc7abab2186b Mon Sep 17 00:00:00 2001 From: Leufolium Date: Fri, 16 Aug 2024 07:06:12 +0800 Subject: [PATCH 03/11] by Robin at 20240816 --- api/proto/streamer/proto/streamer_vo_api.go | 2 ++ app/mix/dao/elasticsearch.go | 5 ++++- app/mix/service/apiservice.go | 12 +++++++++--- app/mix/service/logic/streamer_acct.go | 8 ++++---- bizcommon/util/util.go | 2 +- 5 files changed, 20 insertions(+), 9 deletions(-) diff --git a/api/proto/streamer/proto/streamer_vo_api.go b/api/proto/streamer/proto/streamer_vo_api.go index 9b281551..e5c55d49 100644 --- a/api/proto/streamer/proto/streamer_vo_api.go +++ b/api/proto/streamer/proto/streamer_vo_api.go @@ -45,6 +45,8 @@ type ApiListExtVO struct { Zones []*dbstruct.Zone `json:"zones" bson:"zones"` IsActiveWithinAWeek *int64 `json:"is_active_within_a_week"` // 7日内空间是否活跃 DaysElapsedSinceTheLastZonesUpdate *int64 `json:"days_elapsed_since_the_last_zones_update"` // 空间最后活跃时间距离现在时间跨度 + + Score float64 `json:"score"` } func (vo *ApiListExtVO) GetMid() int64 { diff --git a/app/mix/dao/elasticsearch.go b/app/mix/dao/elasticsearch.go index c60ebde0..a09189df 100644 --- a/app/mix/dao/elasticsearch.go +++ b/app/mix/dao/elasticsearch.go @@ -174,7 +174,7 @@ func (es *ElasticSearch) GetStreamerAcctListFuzzilyByUserId(ctx *gin.Context, re return } -func (es *ElasticSearch) FilterStreamerAcctList(ctx *gin.Context, req *streameracctproto.OpFilterReq) (list []*dbstruct.EsStreamerAcct, scorelist []float64, fullscore int64, err error) { +func (es *ElasticSearch) FilterStreamerAcctList(ctx *gin.Context, req *streameracctproto.OpFilterReq) (list []*dbstruct.EsStreamerAcct, scorelist []float64, fullscore float64, err error) { list = make([]*dbstruct.EsStreamerAcct, 0) scorelist = make([]float64, 0) @@ -242,6 +242,9 @@ func (es *ElasticSearch) FilterStreamerAcctList(ctx *gin.Context, req *streamera // 按得分倒序排列 sortClause := elastic.NewScoreSort().Desc() + source, _ := funcScoreQuery.Source() + logger.Info(util.ToJson(source)) + res, err := es.clientMix.Search(es.getIndexStreamerAcct()).Query(funcScoreQuery).From(req.Offset).Size(req.Limit).SortBy(sortClause).Do(ctx) if err != nil { logger.Error("Search %v fail, err: %v", IndexStreamerAcct, err) diff --git a/app/mix/service/apiservice.go b/app/mix/service/apiservice.go index fd87e35d..7e7d7fe7 100644 --- a/app/mix/service/apiservice.go +++ b/app/mix/service/apiservice.go @@ -1786,7 +1786,7 @@ func (s *Service) ApiFilterStreamer(ctx *gin.Context, req *streamerproto.ApiFilt } //1.从主播用户表中模糊匹配所有主播信息的用户侧数据,按mid正序排序 - streameraccts, err := _DefaultStreamerAcct.OpFilterStreamerAcct(ctx, &streameracctproto.OpFilterReq{ + streameraccts, scores, fullscore, err := _DefaultStreamerAcct.OpFilterStreamerAcct(ctx, &streameracctproto.OpFilterReq{ Age: req.Age, Fans: req.Fans, Height: req.Height, @@ -1818,9 +1818,15 @@ func (s *Service) ApiFilterStreamer(ctx *gin.Context, req *streamerproto.ApiFilt return } streamerlist = make([]*streamerproto.ApiListExtVO, 0) - for _, mid := range mids { + recommlist = make([]*streamerproto.ApiListExtVO, 0) + for i, mid := range mids { vo, _ := mp[mid].(*streamerproto.ApiListExtVO) - streamerlist = append(streamerlist, vo) + vo.Score = scores[i] + if scores[i] < fullscore { + recommlist = append(recommlist, vo) + } else { + streamerlist = append(streamerlist, vo) + } } return diff --git a/app/mix/service/logic/streamer_acct.go b/app/mix/service/logic/streamer_acct.go index 70959ea5..adfc4ab7 100644 --- a/app/mix/service/logic/streamer_acct.go +++ b/app/mix/service/logic/streamer_acct.go @@ -75,11 +75,11 @@ func (p *StreamerAcct) OpListStreamerAcctFuzzilyByName(ctx *gin.Context, req *st return list, nil } -func (p *StreamerAcct) OpFilterStreamerAcct(ctx *gin.Context, req *streameracctproto.OpFilterReq) ([]*dbstruct.EsStreamerAcct, error) { - list, err := p.store.FilterStreamerAcctList(ctx, req) +func (p *StreamerAcct) OpFilterStreamerAcct(ctx *gin.Context, req *streameracctproto.OpFilterReq) ([]*dbstruct.EsStreamerAcct, []float64, float64, error) { + list, scorelist, fullscore, err := p.store.FilterStreamerAcctList(ctx, req) if err != nil { logger.Error("FilterStreamerAcctList fail, err: %v", err) - return nil, err + return nil, nil, 0, err } - return list, nil + return list, scorelist, fullscore, nil } diff --git a/bizcommon/util/util.go b/bizcommon/util/util.go index 54968acd..cddc0d2c 100644 --- a/bizcommon/util/util.go +++ b/bizcommon/util/util.go @@ -378,7 +378,7 @@ func GetLastLessOrEqualForFloat64(arr []float64, target float64) int { type Int64Filter struct { LowerBound *int64 `json:"lower_bound" bson:"lower_bound"` UpperBound *int64 `json:"upper_bound" bson:"upper_bound"` - Scale int64 `json:"scale" bson:"scale"` + Scale float64 `json:"scale" bson:"scale"` Decay float64 `json:"decay" bson:"decay"` Weight float64 `json:"weight" bson:"weight"` } -- 2.41.0 From 58e5c6ab1de252e1b9a638394a3f7550ea4633c3 Mon Sep 17 00:00:00 2001 From: Leufolium Date: Tue, 20 Aug 2024 17:35:50 +0800 Subject: [PATCH 04/11] by Robin at 20240820 --- .../streamer_acct/proto/streamer_acct_op.go | 2 + api/proto/zone/proto/zone_op.go | 6 ++ apollostruct/streamer_filter.go | 2 + app/mix/dao/elasticsearch.go | 12 ++- app/mix/service/apiservice.go | 12 +-- app/mix/service/logic/es_decorated.go | 74 +++++++++++++++++-- app/mix/service/service.go | 37 +++++++++- dbstruct/streamer_acct.go | 4 + 8 files changed, 133 insertions(+), 16 deletions(-) diff --git a/api/proto/streamer_acct/proto/streamer_acct_op.go b/api/proto/streamer_acct/proto/streamer_acct_op.go index 4d714be0..e7bb2cbc 100644 --- a/api/proto/streamer_acct/proto/streamer_acct_op.go +++ b/api/proto/streamer_acct/proto/streamer_acct_op.go @@ -49,6 +49,8 @@ type OpFilterReq struct { Fans *util.Int64Filter `json:"fans"` Height *util.Int64Filter `json:"height"` Weight *util.Int64Filter `json:"weight"` + WechatCoinPrice *util.Int64Filter `json:"wechat_coin_price"` + ZoneAdmissionPrice *util.Int64Filter `json:"zone_admission_price"` City *string `json:"city"` Constellation *string `json:"constellation"` IsActiveWithinAWeek *int64 `json:"is_active_within_a_week"` diff --git a/api/proto/zone/proto/zone_op.go b/api/proto/zone/proto/zone_op.go index 49d3bfb8..37071184 100644 --- a/api/proto/zone/proto/zone_op.go +++ b/api/proto/zone/proto/zone_op.go @@ -9,6 +9,12 @@ import ( type OpCreateReq struct { base.BaseRequest *dbstruct.Zone + AdmissionPrice int64 `json:"admission_price"` // 空间解锁价格, 单位: 分 + IronfanshipPrice int64 `json:"ironfanship_price"` // 铁粉解锁价格, 单位: 分 + IsSuperfanshipEnabled int `json:"is_superfanship_enabled"` // 是否开启超粉空间 0: 关闭, 1: 开启 + SuperfanshipPrice int64 `json:"superfanship_price"` // 超粉价格, 单位: 分 + SuperfanshipValidPeriod int `json:"superfanship_valid_period"` // 超粉有效期类型, SuperfanshipValidPeriod* + IsSuperfanshipGiveWechat int `json:"is_superfanship_give_wechat"` // 是否开启超粉空间赠送微信 0: 不赠送, 1: 赠送 } type OpCreateData struct { diff --git a/apollostruct/streamer_filter.go b/apollostruct/streamer_filter.go index bc38d979..4dea5e59 100644 --- a/apollostruct/streamer_filter.go +++ b/apollostruct/streamer_filter.go @@ -7,6 +7,8 @@ type StreamerFilterCfg struct { Fans *util.Int64Filter `json:"fans"` Height *util.Int64Filter `json:"height"` Weight *util.Int64Filter `json:"weight"` + WechatCoinPrice *util.Int64Filter `json:"wechat_coin_price"` + ZoneAdmissionPrice *util.Int64Filter `json:"zone_admission_price"` CityWeight float64 `json:"city_weight"` ConstellationWeight float64 `json:"constellation_weight"` IsActiveWithinAWeekWeight float64 `json:"is_active_within_a_week_weight"` diff --git a/app/mix/dao/elasticsearch.go b/app/mix/dao/elasticsearch.go index a09189df..1d522f0b 100644 --- a/app/mix/dao/elasticsearch.go +++ b/app/mix/dao/elasticsearch.go @@ -193,12 +193,14 @@ func (es *ElasticSearch) FilterStreamerAcctList(ctx *gin.Context, req *streamera funcScoreQuery.MaxBoost(3.4028235e38) funcScoreQuery.Boost(1.0) - // 写入关于年龄、粉丝量、身高、体重的高斯衰减打分 + // 写入关于年龄、粉丝量、身高、体重、微信价格、空间价格的高斯衰减打分 matchall := elastic.NewMatchAllQuery().Boost(1.0) agefunc := util.CreateGaussDecayFunction("age", req.Age, req.RefStreamerFilter.Age) fansfunc := util.CreateGaussDecayFunction("fans", req.Fans, req.RefStreamerFilter.Fans) heightfunc := util.CreateGaussDecayFunction("height", req.Height, req.RefStreamerFilter.Height) weightfunc := util.CreateGaussDecayFunction("weight", req.Weight, req.RefStreamerFilter.Weight) + wechatcoinpricefunc := util.CreateGaussDecayFunction("wechat_coin_price", req.WechatCoinPrice, req.RefStreamerFilter.WechatCoinPrice) + zoneadmissionpricefunc := util.CreateGaussDecayFunction("zone_admission_price", req.ZoneAdmissionPrice, req.RefStreamerFilter.ZoneAdmissionPrice) if agefunc != nil { funcScoreQuery.Add(matchall, agefunc) @@ -216,6 +218,14 @@ func (es *ElasticSearch) FilterStreamerAcctList(ctx *gin.Context, req *streamera funcScoreQuery.Add(matchall, weightfunc) fullscore += 1.0 } + if wechatcoinpricefunc != nil { + funcScoreQuery.Add(matchall, wechatcoinpricefunc) + fullscore += 1.0 + } + if zoneadmissionpricefunc != nil { + funcScoreQuery.Add(matchall, zoneadmissionpricefunc) + fullscore += 1.0 + } // 写入关于城市、星座的脚本打分 if req.City != nil { diff --git a/app/mix/service/apiservice.go b/app/mix/service/apiservice.go index 7e7d7fe7..8c6825af 100644 --- a/app/mix/service/apiservice.go +++ b/app/mix/service/apiservice.go @@ -2645,8 +2645,9 @@ func (s *Service) ApiCreateZone(ctx *gin.Context, req *zoneproto.ApiCreateReq) ( req.Zone.VideoCount = goproto.Int64(0) req.Zone.IsZoneThirdPartnerHided = goproto.Int64(consts.IsHided_No) req.Zone.Profile = goproto.String(defaultZoneText) - err, zid := _DefaultZone.OpCreate(ctx, &zoneproto.OpCreateReq{ - Zone: req.Zone, + err, zid := _DefaultZoneDecrtByEs.OpCreate(ctx, &zoneproto.OpCreateReq{ + Zone: req.Zone, + AdmissionPrice: req.AdmissionPrice, }) if err != nil { logger.Error("OpCreate fail, req: %v, err: %v", util.ToJson(req), err) @@ -2725,8 +2726,9 @@ func (s *Service) ApiUpdateZone(ctx *gin.Context, req *zoneproto.ApiUpdateReq) ( profile := req.Zone.GetProfile() req.Zone.Profile = nil - err = _DefaultZone.OpUpdate(ctx, &zoneproto.OpUpdateReq{ - Zone: req.Zone, + err = _DefaultZoneDecrtByEs.OpUpdate(ctx, &zoneproto.OpUpdateReq{ + Zone: req.Zone, + AdmissionPrice: req.AdmissionPrice, }) if err == qmgo.ErrNoSuchDocuments { ec = errcode.ErrCodeZoneNotExist @@ -2767,7 +2769,7 @@ func (s *Service) ApiUpdateZone(ctx *gin.Context, req *zoneproto.ApiUpdateReq) ( func (s *Service) ApiDeleteZone(ctx *gin.Context, id int64) (ec errcode.ErrCode) { ec = errcode.ErrCodeZoneSrvOk - err := _DefaultZone.OpDelete(ctx, id) + err := _DefaultZoneDecrtByEs.OpDelete(ctx, id) if err != nil { logger.Error("OpDelete fail, id: %v, err: %v", id, err) ec = errcode.ErrCodeZoneSrvFail diff --git a/app/mix/service/logic/es_decorated.go b/app/mix/service/logic/es_decorated.go index 213da9c6..6a0e54e1 100644 --- a/app/mix/service/logic/es_decorated.go +++ b/app/mix/service/logic/es_decorated.go @@ -32,12 +32,13 @@ func (p *StreamerDecrtByEs) OpUpdate(ctx *gin.Context, req *streamerproto.OpUpda } // 更新es if err := p.StreamerAcct.OpUpdateSelectively(ctx, &dbstruct.EsStreamerAcctUpdater{ - Mid: req.Streamer.Mid, - Age: req.Streamer.Age, - Height: req.Streamer.Height, - Weight: req.Streamer.Weight, - City: req.Streamer.City, - Constellation: req.Streamer.Constellation, + Mid: req.Streamer.Mid, + Age: req.Streamer.Age, + Height: req.Streamer.Height, + Weight: req.Streamer.Weight, + City: req.Streamer.City, + Constellation: req.Streamer.Constellation, + WechatCoinPrice: req.Streamer.WechatCoinPrice, }); err != nil { logger.Error("OpUpdateSelectively fail, req: %v, err: %v", util.ToJson(req), err) return err @@ -57,6 +58,67 @@ func NewZoneDecrtByEs(streamerAcct *StreamerAcct, zone *Zone) *ZoneDecrtByEs { } } +func (p *ZoneDecrtByEs) OpCreate(ctx *gin.Context, req *zoneproto.OpCreateReq) (error, int64) { + err, id := p.Zone.OpCreate(ctx, req) + if err != nil { + logger.Error("CreateZone fail, req: %v, err: %v", util.ToJson(req), err) + return err, id + } + + // 更新es + if err := p.StreamerAcct.OpUpdateSelectively(ctx, &dbstruct.EsStreamerAcctUpdater{ + Mid: req.Zone.Mid, + ZoneAdmissionPrice: goproto.Int64(req.AdmissionPrice), + }); err != nil { + logger.Error("OpUpdateSelectively fail, req: %v, err: %v", util.ToJson(req), err) + return err, id + } + return nil, id +} + +func (p *ZoneDecrtByEs) OpUpdate(ctx *gin.Context, req *zoneproto.OpUpdateReq) error { + err := p.Zone.OpUpdate(ctx, req) + if err != nil { + logger.Error("UpdateZone fail, err: %v", err) + return err + } + + // 更新es + if err := p.StreamerAcct.OpUpdateSelectively(ctx, &dbstruct.EsStreamerAcctUpdater{ + Mid: goproto.Int64(req.BaseRequest.Mid), + ZoneAdmissionPrice: goproto.Int64(req.AdmissionPrice), + }); err != nil { + logger.Error("OpUpdateSelectively fail, req: %v, err: %v", util.ToJson(req), err) + return err + } + return nil +} + +func (p *ZoneDecrtByEs) OpDelete(ctx *gin.Context, id int64) error { + + zone, err := p.Zone.GetById(ctx, id) + if err != nil { + logger.Error("Zone GetById fail, err: %v", err) + return err + } + + err = p.Zone.OpDelete(ctx, id) + if err != nil { + logger.Error("DeleteZone fail, err: %v", err) + return err + } + + // 更新es + if err := p.StreamerAcct.OpUpdateSelectively(ctx, &dbstruct.EsStreamerAcctUpdater{ + Mid: zone.Mid, + ZoneAdmissionPrice: goproto.Int64(-1), + }); err != nil { + logger.Error("OpUpdateSelectively fail, err: %v", err) + return err + } + return nil +} + func (p *ZoneDecrtByEs) OpUpdateByIds(ctx *gin.Context, req *zoneproto.OpUpdateReq, ids []int64) error { err := p.Zone.OpUpdateByIds(ctx, req, ids) if err != nil { diff --git a/app/mix/service/service.go b/app/mix/service/service.go index ebc0cf64..2a2a2f89 100644 --- a/app/mix/service/service.go +++ b/app/mix/service/service.go @@ -1891,7 +1891,7 @@ func (s *Service) OpApproveStreamerAuthApproval(ctx *gin.Context, req *streamera return } streamerAccts := make([]*dbstruct.EsStreamerAcct, 0) - for _, streamer := range streamerList { + for i, streamer := range streamerList { acct := accountMp[streamer.GetMid()] streamerAccts = append(streamerAccts, &dbstruct.EsStreamerAcct{ Mid: acct.GetMid(), @@ -1904,6 +1904,8 @@ func (s *Service) OpApproveStreamerAuthApproval(ctx *gin.Context, req *streamera City: streamer.GetCity(), Constellation: streamer.GetConstellation(), LastZoneMomentCreateDayStart: 0, + WechatCoinPrice: util.DerefInt64(list[i].WechatCoinPrice), + ZoneAdmissionPrice: -1, // es只用于过滤搜索,设置为-1不影响付款 Ct: acct.GetCt(), Ut: acct.GetUt(), DelFlag: acct.GetDelFlag(), @@ -2264,6 +2266,8 @@ func (s *Service) OpApproveStreamerAuthApprovalDetails(ctx *gin.Context, req *st City: streamer.GetCity(), Constellation: streamer.GetConstellation(), LastZoneMomentCreateDayStart: 0, + WechatCoinPrice: util.DerefInt64(streamerAuthApprovalDetails.WechatCoinPrice), + ZoneAdmissionPrice: -1, // es只用于过滤搜索,设置为-1不影响付款 Ct: acct.GetCt(), Ut: acct.GetUt(), DelFlag: acct.GetDelFlag(), @@ -3877,7 +3881,7 @@ func (s *Service) OpGetAccountCancellationList(ctx *gin.Context, req *account_ca func (s *Service) OpCreateZone(ctx *gin.Context, req *zoneproto.OpCreateReq) (ec errcode.ErrCode) { ec = errcode.ErrCodeZoneSrvOk req.Zone.IsZoneThirdPartnerHided = goproto.Int64(consts.IsHided_No) - err, _ := _DefaultZone.OpCreate(ctx, req) + err, _ := _DefaultZoneDecrtByEs.OpCreate(ctx, req) if err != nil { logger.Error("OpCreate fail, req: %v, err: %v", util.ToJson(req), err) ec = errcode.ErrCodeZoneSrvFail @@ -3888,7 +3892,32 @@ func (s *Service) OpCreateZone(ctx *gin.Context, req *zoneproto.OpCreateReq) (ec func (s *Service) OpUpdateZone(ctx *gin.Context, req *zoneproto.OpUpdateReq) (ec errcode.ErrCode) { ec = errcode.ErrCodeZoneSrvOk - err := _DefaultZone.OpUpdate(ctx, req) + + // 转为更新自己空间请求 + selfReq := &zoneproto.OpUpdateReq{ + BaseRequest: req.BaseRequest, + Zone: req.Zone, + AdmissionPrice: req.AdmissionPrice, + IronfanshipPrice: req.IronfanshipPrice, + IsSuperfanshipEnabled: req.IsSuperfanshipEnabled, + SuperfanshipPrice: req.SuperfanshipPrice, + SuperfanshipValidPeriod: req.SuperfanshipValidPeriod, + IsSuperfanshipGiveWechat: req.IsSuperfanshipGiveWechat, + } + zone, err := _DefaultZone.GetById(ctx, req.Zone.GetId()) + if err == qmgo.ErrNoSuchDocuments { + ec = errcode.ErrCodeZoneNotExist + err = nil + return + } + if err != nil { + logger.Error("Zone GetById fail, err: %v", err) + ec = errcode.ErrCodeZoneSrvFail + return + } + selfReq.BaseRequest.Mid = zone.GetMid() + + err = _DefaultZoneDecrtByEs.OpUpdate(ctx, selfReq) if err == qmgo.ErrNoSuchDocuments { ec = errcode.ErrCodeZoneNotExist err = nil @@ -3924,7 +3953,7 @@ func (s *Service) OpUpdateZone(ctx *gin.Context, req *zoneproto.OpUpdateReq) (ec func (s *Service) OpDeleteZone(ctx *gin.Context, id int64) (ec errcode.ErrCode) { ec = errcode.ErrCodeZoneSrvOk - err := _DefaultZone.OpDelete(ctx, id) + err := _DefaultZoneDecrtByEs.OpDelete(ctx, id) if err != nil { logger.Error("OpDelete fail, id: %v, err: %v", id, err) ec = errcode.ErrCodeZoneSrvFail diff --git a/dbstruct/streamer_acct.go b/dbstruct/streamer_acct.go index de502b27..bc55648f 100644 --- a/dbstruct/streamer_acct.go +++ b/dbstruct/streamer_acct.go @@ -11,6 +11,8 @@ type EsStreamerAcct struct { City string `json:"city"` // 所在城市 Constellation string `json:"constellation"` // 星座 LastZoneMomentCreateDayStart int64 `json:"last_zone_moment_create_day_start"` // 最后空间动态创建日始整点 + WechatCoinPrice int64 `json:"wechat_coin_price"` // 微信金币价格 + ZoneAdmissionPrice int64 `json:"zone_admission_price"` // 空间解锁价格, 单位: 分 Ct int64 `json:"ct"` // 创建时间 Ut int64 `json:"ut"` // 更新时间 DelFlag int64 `json:"del_flag"` // 删除标记,0-否,1-是 @@ -27,6 +29,8 @@ type EsStreamerAcctUpdater struct { City *string `json:"city" bson:"city"` // 所在城市 Constellation *string `json:"constellation" bson:"constellation"` // 星座 LastZoneMomentCreateDayStart *int64 `json:"last_zone_moment_create_day_start" bson:"last_zone_moment_create_day_start"` // 最后空间动态创建日始整点 + WechatCoinPrice *int64 `json:"wechat_coin_price"` // 微信金币价格 + ZoneAdmissionPrice *int64 `json:"zone_admission_price"` // 空间解锁价格, 单位: 分 } func (p *EsStreamerAcctUpdater) GetMid() int64 { -- 2.41.0 From 7a3754550f2e5655f2bf610afff18410297d6764 Mon Sep 17 00:00:00 2001 From: Leufolium Date: Tue, 20 Aug 2024 18:47:56 +0800 Subject: [PATCH 05/11] by Robin at 20240820; no zone price --- api/consts/option.go | 2 ++ app/mix/service/logic/es_decorated.go | 3 ++- app/mix/service/service.go | 4 ++-- 3 files changed, 6 insertions(+), 3 deletions(-) diff --git a/api/consts/option.go b/api/consts/option.go index c70abb54..78ab15a9 100644 --- a/api/consts/option.go +++ b/api/consts/option.go @@ -98,3 +98,5 @@ const ( StreamerAuthApproval_OneOffVersion = 0 StreamerAuthApproval_SeparateVersion = 1 ) + +const ZoneAdmissionPrice_NoZone = int64(-999999) diff --git a/app/mix/service/logic/es_decorated.go b/app/mix/service/logic/es_decorated.go index 6a0e54e1..12245df3 100644 --- a/app/mix/service/logic/es_decorated.go +++ b/app/mix/service/logic/es_decorated.go @@ -1,6 +1,7 @@ package logic import ( + "service/api/consts" streamerproto "service/api/proto/streamer/proto" zoneproto "service/api/proto/zone/proto" "service/bizcommon/util" @@ -111,7 +112,7 @@ func (p *ZoneDecrtByEs) OpDelete(ctx *gin.Context, id int64) error { // 更新es if err := p.StreamerAcct.OpUpdateSelectively(ctx, &dbstruct.EsStreamerAcctUpdater{ Mid: zone.Mid, - ZoneAdmissionPrice: goproto.Int64(-1), + ZoneAdmissionPrice: goproto.Int64(consts.ZoneAdmissionPrice_NoZone), }); err != nil { logger.Error("OpUpdateSelectively fail, err: %v", err) return err diff --git a/app/mix/service/service.go b/app/mix/service/service.go index 2a2a2f89..a0b229e3 100644 --- a/app/mix/service/service.go +++ b/app/mix/service/service.go @@ -1905,7 +1905,7 @@ func (s *Service) OpApproveStreamerAuthApproval(ctx *gin.Context, req *streamera Constellation: streamer.GetConstellation(), LastZoneMomentCreateDayStart: 0, WechatCoinPrice: util.DerefInt64(list[i].WechatCoinPrice), - ZoneAdmissionPrice: -1, // es只用于过滤搜索,设置为-1不影响付款 + ZoneAdmissionPrice: consts.ZoneAdmissionPrice_NoZone, // es只用于过滤搜索,设置不影响付款 Ct: acct.GetCt(), Ut: acct.GetUt(), DelFlag: acct.GetDelFlag(), @@ -2267,7 +2267,7 @@ func (s *Service) OpApproveStreamerAuthApprovalDetails(ctx *gin.Context, req *st Constellation: streamer.GetConstellation(), LastZoneMomentCreateDayStart: 0, WechatCoinPrice: util.DerefInt64(streamerAuthApprovalDetails.WechatCoinPrice), - ZoneAdmissionPrice: -1, // es只用于过滤搜索,设置为-1不影响付款 + ZoneAdmissionPrice: consts.ZoneAdmissionPrice_NoZone, // es只用于过滤搜索,设置不影响付款 Ct: acct.GetCt(), Ut: acct.GetUt(), DelFlag: acct.GetDelFlag(), -- 2.41.0 From 18e50bbd86d772aa751e1806ac9464449013d4cd Mon Sep 17 00:00:00 2001 From: Leufolium Date: Wed, 21 Aug 2024 22:19:56 +0800 Subject: [PATCH 06/11] by Robin at 20240821 --- app/mix/dao/elasticsearch.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/app/mix/dao/elasticsearch.go b/app/mix/dao/elasticsearch.go index 1d522f0b..8624fcee 100644 --- a/app/mix/dao/elasticsearch.go +++ b/app/mix/dao/elasticsearch.go @@ -228,11 +228,11 @@ func (es *ElasticSearch) FilterStreamerAcctList(ctx *gin.Context, req *streamera } // 写入关于城市、星座的脚本打分 - if req.City != nil { + if req.City != nil && util.DerefString(req.City) != "" { funcScoreQuery.AddScoreFunc(util.CreateKeywordScriptScoreFunction("city", util.DerefString(req.City), req.RefStreamerFilter.CityWeight)) fullscore += 1.0 } - if req.Constellation != nil { + if req.Constellation != nil && util.DerefString(req.Constellation) != "" { funcScoreQuery.AddScoreFunc(util.CreateKeywordScriptScoreFunction("constellation", util.DerefString(req.Constellation), req.RefStreamerFilter.ConstellationWeight)) fullscore += 1.0 } -- 2.41.0 From 75413625fdfcbefbdacc6350e84362e6f51c9a41 Mon Sep 17 00:00:00 2001 From: Leufolium Date: Thu, 22 Aug 2024 12:19:01 +0800 Subject: [PATCH 07/11] by Robin at 20240822 --- app/mix/dao/elasticsearch.go | 18 +++++++++--------- 1 file changed, 9 insertions(+), 9 deletions(-) diff --git a/app/mix/dao/elasticsearch.go b/app/mix/dao/elasticsearch.go index 8624fcee..9d7fc29f 100644 --- a/app/mix/dao/elasticsearch.go +++ b/app/mix/dao/elasticsearch.go @@ -204,37 +204,37 @@ func (es *ElasticSearch) FilterStreamerAcctList(ctx *gin.Context, req *streamera if agefunc != nil { funcScoreQuery.Add(matchall, agefunc) - fullscore += 1.0 + fullscore += req.RefStreamerFilter.Age.Weight } if fansfunc != nil { funcScoreQuery.Add(matchall, fansfunc) - fullscore += 1.0 + fullscore += req.RefStreamerFilter.Fans.Weight } if heightfunc != nil { funcScoreQuery.Add(matchall, heightfunc) - fullscore += 1.0 + fullscore += req.RefStreamerFilter.Height.Weight } if weightfunc != nil { funcScoreQuery.Add(matchall, weightfunc) - fullscore += 1.0 + fullscore += req.RefStreamerFilter.Weight.Weight } if wechatcoinpricefunc != nil { funcScoreQuery.Add(matchall, wechatcoinpricefunc) - fullscore += 1.0 + fullscore += req.RefStreamerFilter.WechatCoinPrice.Weight } if zoneadmissionpricefunc != nil { funcScoreQuery.Add(matchall, zoneadmissionpricefunc) - fullscore += 1.0 + fullscore += req.RefStreamerFilter.ZoneAdmissionPrice.Weight } // 写入关于城市、星座的脚本打分 if req.City != nil && util.DerefString(req.City) != "" { funcScoreQuery.AddScoreFunc(util.CreateKeywordScriptScoreFunction("city", util.DerefString(req.City), req.RefStreamerFilter.CityWeight)) - fullscore += 1.0 + fullscore += req.RefStreamerFilter.CityWeight } if req.Constellation != nil && util.DerefString(req.Constellation) != "" { funcScoreQuery.AddScoreFunc(util.CreateKeywordScriptScoreFunction("constellation", util.DerefString(req.Constellation), req.RefStreamerFilter.ConstellationWeight)) - fullscore += 1.0 + fullscore += req.RefStreamerFilter.ConstellationWeight } // 写入关于是否7日内有更新的脚本打分 @@ -245,7 +245,7 @@ func (es *ElasticSearch) FilterStreamerAcctList(ctx *gin.Context, req *streamera scriptStr := "doc['%s'].value >= %d ? %f : %f" script := elastic.NewScript(fmt.Sprintf(scriptStr, "last_zone_moment_create_day_start", daystartBeforeAWeek, 1.0, 0.0)) funcScoreQuery.AddScoreFunc(elastic.NewScriptFunction(script).Weight(req.RefStreamerFilter.IsActiveWithinAWeekWeight)) - fullscore += 1.0 + fullscore += req.RefStreamerFilter.IsActiveWithinAWeekWeight } } -- 2.41.0 From eb4f10506ed6df7703f6a5372915d4a23e503626 Mon Sep 17 00:00:00 2001 From: Leufolium Date: Thu, 29 Aug 2024 14:28:52 +0800 Subject: [PATCH 08/11] add api --- api/proto/streamer/proto/streamer_api.go | 2 ++ app/mix/service/apiservice.go | 2 ++ 2 files changed, 4 insertions(+) diff --git a/api/proto/streamer/proto/streamer_api.go b/api/proto/streamer/proto/streamer_api.go index b496d186..88d44952 100644 --- a/api/proto/streamer/proto/streamer_api.go +++ b/api/proto/streamer/proto/streamer_api.go @@ -202,6 +202,8 @@ type ApiFilterReq struct { Fans *util.Int64Filter `json:"fans"` Height *util.Int64Filter `json:"height"` Weight *util.Int64Filter `json:"weight"` + WechatCoinPrice *util.Int64Filter `json:"wechat_coin_price"` + ZoneAdmissionPrice *util.Int64Filter `json:"zone_admission_price"` City *string `json:"city"` Constellation *string `json:"constellation"` IsActiveWithinAWeek *int64 `json:"is_active_within_a_week"` diff --git a/app/mix/service/apiservice.go b/app/mix/service/apiservice.go index 8c6825af..827e0815 100644 --- a/app/mix/service/apiservice.go +++ b/app/mix/service/apiservice.go @@ -1791,6 +1791,8 @@ func (s *Service) ApiFilterStreamer(ctx *gin.Context, req *streamerproto.ApiFilt Fans: req.Fans, Height: req.Height, Weight: req.Weight, + WechatCoinPrice: req.WechatCoinPrice, + ZoneAdmissionPrice: req.ZoneAdmissionPrice, City: req.City, Constellation: req.Constellation, IsActiveWithinAWeek: req.IsActiveWithinAWeek, -- 2.41.0 From 2a12aee17044c7ba5e59c903383dc73c6c7a3584 Mon Sep 17 00:00:00 2001 From: Leufolium Date: Thu, 29 Aug 2024 16:01:45 +0800 Subject: [PATCH 09/11] by Robin at 20240829 --- dbstruct/streamer_acct.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/dbstruct/streamer_acct.go b/dbstruct/streamer_acct.go index bc55648f..2d7cd9cd 100644 --- a/dbstruct/streamer_acct.go +++ b/dbstruct/streamer_acct.go @@ -8,6 +8,7 @@ type EsStreamerAcct struct { Age int64 `json:"age"` // 年龄 Height int64 `json:"height"` // 身高 Weight int64 `json:"weight"` // 体重 + Fans int64 `json:"fans"` // 粉丝量 City string `json:"city"` // 所在城市 Constellation string `json:"constellation"` // 星座 LastZoneMomentCreateDayStart int64 `json:"last_zone_moment_create_day_start"` // 最后空间动态创建日始整点 @@ -26,6 +27,7 @@ type EsStreamerAcctUpdater struct { Age *int64 `json:"age" bson:"age"` // 年龄 Height *int64 `json:"height" bson:"height"` // 身高 Weight *int64 `json:"weight" bson:"weight"` // 体重 + Fans *int64 `json:"fans"` // 粉丝量 City *string `json:"city" bson:"city"` // 所在城市 Constellation *string `json:"constellation" bson:"constellation"` // 星座 LastZoneMomentCreateDayStart *int64 `json:"last_zone_moment_create_day_start" bson:"last_zone_moment_create_day_start"` // 最后空间动态创建日始整点 -- 2.41.0 From 22aebfe573d97b05a1c9223080081767c1c163a0 Mon Sep 17 00:00:00 2001 From: Leufolium Date: Thu, 29 Aug 2024 17:30:14 +0800 Subject: [PATCH 10/11] by Robin at 20240829 cut zero width unicode --- app/mix/service/apiservice.go | 3 +++ bizcommon/util/util.go | 7 +++++++ 2 files changed, 10 insertions(+) diff --git a/app/mix/service/apiservice.go b/app/mix/service/apiservice.go index 827e0815..1cadfb9c 100644 --- a/app/mix/service/apiservice.go +++ b/app/mix/service/apiservice.go @@ -1785,6 +1785,9 @@ func (s *Service) ApiFilterStreamer(ctx *gin.Context, req *streamerproto.ApiFilt return } + constellation := util.TrimZeroWidthUnicode(util.DerefString(req.Constellation)) + req.Constellation = goproto.String(constellation) + //1.从主播用户表中模糊匹配所有主播信息的用户侧数据,按mid正序排序 streameraccts, scores, fullscore, err := _DefaultStreamerAcct.OpFilterStreamerAcct(ctx, &streameracctproto.OpFilterReq{ Age: req.Age, diff --git a/bizcommon/util/util.go b/bizcommon/util/util.go index cddc0d2c..eade0f17 100644 --- a/bizcommon/util/util.go +++ b/bizcommon/util/util.go @@ -419,3 +419,10 @@ func CreateKeywordScriptScoreFunction(name string, value string, weight float64) script := elastic.NewScript(fmt.Sprintf(scriptStr, name, value, 1.0, 0.0)) return elastic.NewScriptFunction(script).Weight(weight) } + +func TrimZeroWidthUnicode(str string) string { + str = strings.Trim(str, "\xe2\x80\x8b") + str = strings.Trim(str, "\xe2\x80\x8c") + str = strings.Trim(str, "\xe2\x80\x8d") + return str +} -- 2.41.0 From afe158e26de2d2465249dd8bfb68740a0b23ebb6 Mon Sep 17 00:00:00 2001 From: Leufolium Date: Thu, 29 Aug 2024 17:32:35 +0800 Subject: [PATCH 11/11] 1 --- app/mix/service/apiservice.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/app/mix/service/apiservice.go b/app/mix/service/apiservice.go index 1cadfb9c..b90125b2 100644 --- a/app/mix/service/apiservice.go +++ b/app/mix/service/apiservice.go @@ -1785,8 +1785,10 @@ func (s *Service) ApiFilterStreamer(ctx *gin.Context, req *streamerproto.ApiFilt return } + city := util.TrimZeroWidthUnicode(util.DerefString(req.City)) constellation := util.TrimZeroWidthUnicode(util.DerefString(req.Constellation)) req.Constellation = goproto.String(constellation) + req.City = goproto.String(city) //1.从主播用户表中模糊匹配所有主播信息的用户侧数据,按mid正序排序 streameraccts, scores, fullscore, err := _DefaultStreamerAcct.OpFilterStreamerAcct(ctx, &streameracctproto.OpFilterReq{ -- 2.41.0