From 41cf08be7c715b6309a9446bbe163807d1590efd Mon Sep 17 00:00:00 2001 From: Leufolium Date: Fri, 21 Jun 2024 19:51:55 +0800 Subject: [PATCH 1/2] by Robin at 20240621 --- api/consts/option.go | 3 ++ app/mix/dao/mongo.go | 21 ++++++++++++++ app/mix/service/logic/accountpunishment.go | 9 ++++++ app/mix/service/service.go | 2 +- app/mix/service/streamer_recomm_service.go | 33 ++++++++++++++++------ app/mix/service/xxljob_tasks.go | 15 +++++++++- dbstruct/accountpunishment.go | 7 +++++ 7 files changed, 80 insertions(+), 10 deletions(-) diff --git a/api/consts/option.go b/api/consts/option.go index 33637af5..bfd22e7d 100644 --- a/api/consts/option.go +++ b/api/consts/option.go @@ -23,6 +23,9 @@ const ( AccountPunishment_BlockFromCreatingFreeZoneMoment = 1 // 禁止发免费空间贴 AccountPunishment_BlockFromCreatingPaidZoneMoment = 2 // 禁止发付费空间贴 AccountPunishment_BlockFromCreatingZoneMoment = 3 // 禁止发空间贴 + AccountPunishment_BlockFromBeingSearched = 4 // 禁止被搜索到 + AccountPunishment_BlockFromBeingDiscovered = 5 // 禁止在推荐被发现 + AccountPunishment_BlockFromBeingSeenAtMoment = 6 // 禁止在广场被发现 ) const ( diff --git a/app/mix/dao/mongo.go b/app/mix/dao/mongo.go index 3fd64a9a..4c608601 100644 --- a/app/mix/dao/mongo.go +++ b/app/mix/dao/mongo.go @@ -3753,6 +3753,27 @@ func (m *Mongo) GetAccountPunishmentListByMidAndType(ctx *gin.Context, mid int64 return accountpunishment, err } +func (m *Mongo) GetAccountPunishmentListByType(ctx *gin.Context, typ int64) ([]*dbstruct.AccountPunishment, error) { + list := make([]*dbstruct.AccountPunishment, 0) + col := m.getColAccountPunishment() + query := qmgo.M{ + "type": typ, + "status": qmgo.M{ + "$ne": consts.AccountPunishment_Interrupted, + }, + "end_time": qmgo.M{ + "$gt": time.Now().Unix(), + }, + "del_flag": 0, + } + err := col.Find(ctx, query).All(&list) + if err == qmgo.ErrNoSuchDocuments { + err = nil + return nil, err + } + return list, err +} + func (m *Mongo) GetTerminatedAccountPunishmentList(ctx *gin.Context, req *accountpunishmentproto.OpListTerminatedReq) ([]*dbstruct.AccountPunishment, error) { list := make([]*dbstruct.AccountPunishment, 0) col := m.getColAccountPunishment() diff --git a/app/mix/service/logic/accountpunishment.go b/app/mix/service/logic/accountpunishment.go index 45403e9f..25d0ed18 100644 --- a/app/mix/service/logic/accountpunishment.go +++ b/app/mix/service/logic/accountpunishment.go @@ -101,3 +101,12 @@ func (p *AccountPunishment) OpListByMidAndType(ctx *gin.Context, mid int64, typ } return accountpunishment, nil } + +func (p *AccountPunishment) OpListByType(ctx *gin.Context, typ int64) ([]*dbstruct.AccountPunishment, error) { + list, err := p.store.GetAccountPunishmentListByType(ctx, typ) + if err != nil { + logger.Error("GetAccountPunishmentListByType fail, err: %v", err) + return nil, err + } + return list, nil +} diff --git a/app/mix/service/service.go b/app/mix/service/service.go index 93fe72ba..e10a7e23 100644 --- a/app/mix/service/service.go +++ b/app/mix/service/service.go @@ -268,7 +268,7 @@ func (s *Service) ConnectToVideoModeration() { // 推荐服务数据库接口 func (s *Service) ConnectToStreamerRecommService(r *StreamerRecommService) { - r.SetStreamerRecommDbService(_DefaultMoment, _DefaultZoneMoment, _DefaultVas, _DefaultStreamer, _DefaultStreamerScore) + r.SetStreamerRecommDbService(_DefaultMoment, _DefaultZoneMoment, _DefaultVas, _DefaultStreamer, _DefaultStreamerScore, _DefaultAccountPunishment) r.SetOut(func(mids []int64) error { err := redis.GetRedisClient().Set(consts.RedisStreamerPrefix+"recomm_list", mids, 0) if err != nil { diff --git a/app/mix/service/streamer_recomm_service.go b/app/mix/service/streamer_recomm_service.go index 4b4bf758..b8f8c49e 100644 --- a/app/mix/service/streamer_recomm_service.go +++ b/app/mix/service/streamer_recomm_service.go @@ -21,12 +21,13 @@ var ( ) type StreamerRecommService struct { - momentService *logic.Moment - zoneMomentService *logic.ZoneMoment - vasService *logic.Vas - streamerService *logic.Streamer - streamerScoreService *logic.StreamerScore - out func([]int64) error + momentService *logic.Moment + zoneMomentService *logic.ZoneMoment + vasService *logic.Vas + streamerService *logic.Streamer + streamerScoreService *logic.StreamerScore + accountpunishmentService *logic.AccountPunishment + out func([]int64) error formula *apollostruct.StreamerScoreFormulaCfg @@ -38,12 +39,13 @@ func NewStreamerRecommService() *StreamerRecommService { return new(StreamerRecommService) } -func (s *StreamerRecommService) SetStreamerRecommDbService(moment *logic.Moment, zonemoment *logic.ZoneMoment, vas *logic.Vas, streamer *logic.Streamer, streamerScore *logic.StreamerScore) { +func (s *StreamerRecommService) SetStreamerRecommDbService(moment *logic.Moment, zonemoment *logic.ZoneMoment, vas *logic.Vas, streamer *logic.Streamer, streamerScore *logic.StreamerScore, accountpunishment *logic.AccountPunishment) { s.momentService = moment s.zoneMomentService = zonemoment s.vasService = vas s.streamerService = streamer s.streamerScoreService = streamerScore + s.accountpunishmentService = accountpunishment } func (s *StreamerRecommService) SetOut(out func([]int64) error) { @@ -211,10 +213,25 @@ func (s *StreamerRecommService) save() { // 推送 func (s *StreamerRecommService) push() error { + + // 查找禁止在推荐被发现的主播,不将其加入推荐列 + acctpunishments, err := _DefaultAccountPunishment.OpListByType(&gin.Context{}, consts.AccountPunishment_BlockFromBeingDiscovered) + if err != nil { + logger.Error("_DefaultAccountPunishment OpListByType fail, err: %v", err) + } + blockedMp := make(map[int64]*dbstruct.AccountPunishment) + for _, acctpunishment := range acctpunishments { + blockedMp[acctpunishment.GetMid()] = &dbstruct.AccountPunishment{} + } + list := make([]int64, 0) l := make([]*dbstruct.StreamerScore, 0) for i := len(s.scorelist) - 1; i >= 0; i-- { v := s.scorelist[i] + if _, ok := blockedMp[v.Mid]; ok { + continue + } + list = append(list, v.Mid) l = append(l, &dbstruct.StreamerScore{ Id: v.Mid, @@ -229,7 +246,7 @@ func (s *StreamerRecommService) push() error { Score: v.Score, }) } - err := _DefaultStreamerScore.OpSetStreamerScore(&gin.Context{}, l) + err = _DefaultStreamerScore.OpSetStreamerScore(&gin.Context{}, l) if err != nil { logger.Error("OpSetStreamerScore fail, err: %v", err) } diff --git a/app/mix/service/xxljob_tasks.go b/app/mix/service/xxljob_tasks.go index 4235b455..1c3d05fb 100644 --- a/app/mix/service/xxljob_tasks.go +++ b/app/mix/service/xxljob_tasks.go @@ -461,6 +461,16 @@ func (s *CronService) ReloadMomentRecommList(ctx context.Context, param *xxl.Run return fmt.Sprintf("OpGetMomentListByMids fail, err: %v", err) } + // 查找禁止在广场被发现的主播,不将其加入推荐列 + acctpunishments, err := _DefaultAccountPunishment.OpListByType(&gin.Context{}, consts.AccountPunishment_BlockFromBeingSeenAtMoment) + if err != nil { + logger.Error("_DefaultAccountPunishment OpListByType fail, err: %v", err) + } + blockedMp := make(map[int64]*dbstruct.AccountPunishment) + for _, acctpunishment := range acctpunishments { + blockedMp[acctpunishment.GetMid()] = &dbstruct.AccountPunishment{} + } + // 清缓存 if err := redis.GetRedisClient().Del(consts.RedisMomentPrefix + "recomm_list"); err != nil { logger.Error("Del redis cache fail, err: %v", err) @@ -477,7 +487,10 @@ func (s *CronService) ReloadMomentRecommList(ctx context.Context, param *xxl.Run // 加载缓存 for _, moment := range list { - id := util.DerefInt64(moment.Id) + if _, ok := blockedMp[moment.GetMid()]; ok { + continue + } + id := moment.GetId() err := redis.GetRedisClient().RPush(consts.RedisMomentPrefix+"recomm_list", id) if err != nil { logger.Error("Redis cache fail, err: %v", err) diff --git a/dbstruct/accountpunishment.go b/dbstruct/accountpunishment.go index 9588743a..96ea451b 100644 --- a/dbstruct/accountpunishment.go +++ b/dbstruct/accountpunishment.go @@ -19,6 +19,13 @@ type AccountPunishment struct { DelFlag *int64 `json:"del_flag" bson:"del_flag"` // 删除标记 } +func (p *AccountPunishment) GetMid() int64 { + if p == nil || p.Mid == nil { + return 0 + } + return *p.Mid +} + func (p *AccountPunishment) GetEndTimeFormatString() string { if p == nil || p.EndTime == nil { return "" From 177a95ac357cdba17a54abbf1446ace3c7250c43 Mon Sep 17 00:00:00 2001 From: Leufolium Date: Fri, 21 Jun 2024 20:28:40 +0800 Subject: [PATCH 2/2] by Robin at 20240621 --- .../streamer_acct/proto/streamer_acct_op.go | 18 +++++----- app/mix/dao/elasticsearch.go | 8 ++--- app/mix/service/apiservice.go | 35 ++++++++++++++----- app/mix/service/cronservice.go | 1 + app/mix/service/xxljob_tasks.go | 28 +++++++++++++++ 5 files changed, 70 insertions(+), 20 deletions(-) diff --git a/api/proto/streamer_acct/proto/streamer_acct_op.go b/api/proto/streamer_acct/proto/streamer_acct_op.go index 5e3ec58a..85d62830 100644 --- a/api/proto/streamer_acct/proto/streamer_acct_op.go +++ b/api/proto/streamer_acct/proto/streamer_acct_op.go @@ -5,10 +5,11 @@ import "service/api/base" // 根据UserId模糊查询(和姓名取并集) type OpListFuzzilyByUserIdReq struct { base.BaseRequest - UserIdString string `json:"user_id_string"` //user_id模糊匹配 - Offset int `json:"offset"` - Limit int `json:"limit"` - Sort []string + UserIdString string `json:"user_id_string"` //user_id模糊匹配 + Offset int `json:"offset"` + Limit int `json:"limit"` + Sort []string + BlockedFromBeingSearchedList []string } type OpListFuzzilyByUserIdData struct { @@ -22,10 +23,11 @@ type OpListFuzzilyByUserIdResp struct { // 根据Name模糊查询 type OpListFuzzilyByNameReq struct { base.BaseRequest - Name string `json:"name"` //name模糊匹配 - Offset int `json:"offset"` - Limit int `json:"limit"` - Sort []string + Name string `json:"name"` //name模糊匹配 + Offset int `json:"offset"` + Limit int `json:"limit"` + Sort []string + BlockedFromBeingSearchedList []string } type OpListFuzzilyByNameData struct { diff --git a/app/mix/dao/elasticsearch.go b/app/mix/dao/elasticsearch.go index 1c0ff038..9db99118 100644 --- a/app/mix/dao/elasticsearch.go +++ b/app/mix/dao/elasticsearch.go @@ -89,9 +89,9 @@ func (es *ElasticSearch) GetStreamerAcctListFuzzilyByName(ctx *gin.Context, req pinyinClause = elastic.NewMatchPhraseQuery("pinyin.long_char", req.Name) } orClause := elastic.NewBoolQuery().Should(nameClause, pinyinClause) - query := elastic.NewBoolQuery().Must() delFlagClause := elastic.NewMatchQuery("del_flag", 0) - query.Must(orClause, delFlagClause) + mustnotClause := elastic.NewTermsQueryFromStrings("_id", req.BlockedFromBeingSearchedList...) + query := elastic.NewBoolQuery().Must(orClause, delFlagClause).MustNot(mustnotClause) res, err := es.clientMix.Search(es.getIndexStreamerAcct()).Query(query).From(req.Offset).Size(req.Limit).Sort("ct", true).Do(ctx) if err != nil { @@ -135,9 +135,9 @@ func (es *ElasticSearch) GetStreamerAcctListFuzzilyByUserId(ctx *gin.Context, re userIdStringClause = elastic.NewMatchPhraseQuery("user_id_string.long_char", req.UserIdString) } orClause := elastic.NewBoolQuery().Should(nameClause, userIdStringClause) - query := elastic.NewBoolQuery().Must() delFlagClause := elastic.NewMatchQuery("del_flag", 0) - query.Must(orClause, delFlagClause) + mustnotClause := elastic.NewTermsQueryFromStrings("_id", req.BlockedFromBeingSearchedList...) + query := elastic.NewBoolQuery().Must(orClause, delFlagClause).MustNot(mustnotClause) res, err := es.clientMix.Search(es.getIndexStreamerAcct()).Query(query).From(req.Offset).Size(req.Limit).Sort("ct", true).Do(ctx) if err != nil { diff --git a/app/mix/service/apiservice.go b/app/mix/service/apiservice.go index b6b96d69..fb7f6999 100644 --- a/app/mix/service/apiservice.go +++ b/app/mix/service/apiservice.go @@ -37,6 +37,7 @@ import ( "service/dbstruct" "service/library/apollo" "service/library/logger" + "service/library/redis" "strings" "time" @@ -1226,12 +1227,21 @@ func (s *Service) ApiGetStreamerExtListFuzzilyByUserId(ctx *gin.Context, req *st ec = errcode.ErrCodeStreamerSrvOk + // 从redis中获取当前禁止被搜索的主播 + blockedlist := make([]string, 0) + err := redis.GetRedisClient().GetObject(consts.RedisStreamerPrefix+"blocked_from_being_searched_list", &blockedlist) + if err != nil { + logger.Error("Redis read failed : %v", err) + return + } + //1.从主播用户表中模糊匹配所有主播信息的用户侧数据,按mid正序排序 streameraccts, err := _DefaultStreamerAcct.OpListStreamerAcctFuzzilyByUserId(ctx, &streameracctproto.OpListFuzzilyByUserIdReq{ - UserIdString: fmt.Sprint(util.DerefInt64(req.UserId)), - Offset: req.Offset, - Limit: req.Limit, - Sort: []string{"_id"}, + UserIdString: fmt.Sprint(util.DerefInt64(req.UserId)), + Offset: req.Offset, + Limit: req.Limit, + Sort: []string{"_id"}, + BlockedFromBeingSearchedList: blockedlist, }) if err != nil { logger.Error("StreamerAcct OpListFuzzilyByUserId fail, req: %v, err: %v", util.ToJson(req), err) @@ -1263,12 +1273,21 @@ func (s *Service) ApiGetStreamerExtListFuzzilyByName(ctx *gin.Context, req *stre ec = errcode.ErrCodeStreamerSrvOk + // 从redis中获取当前禁止被搜索的主播 + blockedlist := make([]string, 0) + err := redis.GetRedisClient().GetObject(consts.RedisStreamerPrefix+"blocked_from_being_searched_list", &blockedlist) + if err != nil { + logger.Error("Redis read failed : %v", err) + return + } + //1.从主播用户表中模糊匹配所有主播信息的用户侧数据,按mid正序排序 streameraccts, err := _DefaultStreamerAcct.OpListStreamerAcctFuzzilyByName(ctx, &streameracctproto.OpListFuzzilyByNameReq{ - Name: req.Name, - Offset: req.Offset, - Limit: req.Limit, - Sort: []string{"_id"}, + Name: req.Name, + Offset: req.Offset, + Limit: req.Limit, + Sort: []string{"_id"}, + BlockedFromBeingSearchedList: blockedlist, }) if err != nil { logger.Error("Account OpListFuzzilyByName fail, req: %v, err: %v", util.ToJson(req), err) diff --git a/app/mix/service/cronservice.go b/app/mix/service/cronservice.go index a56316cc..b1c58abf 100644 --- a/app/mix/service/cronservice.go +++ b/app/mix/service/cronservice.go @@ -84,6 +84,7 @@ func (s *CronService) Init(c any) (exec xxl.Executor, err error) { exec.RegTask("video_moderation_batch", s.VideoModerationBatch) exec.RegTask("video_moderation_batch_his", s.VideoModerationBatchHis) exec.RegTask("clear_expired_btcb", s.ClearExpiredBtcb) + exec.RegTask("reload_blocked_from_being_searched_list", s.ReloadBlockedFromBeingSearchedList) exec.LogHandler(customLogHandle) //注册任务handler diff --git a/app/mix/service/xxljob_tasks.go b/app/mix/service/xxljob_tasks.go index 1c3d05fb..899f8fa0 100644 --- a/app/mix/service/xxljob_tasks.go +++ b/app/mix/service/xxljob_tasks.go @@ -580,3 +580,31 @@ func (s *CronService) ClearExpiredBtcb(ctx context.Context, param *xxl.RunReq) ( logger.Info("Clear expired btcb ends...") return "Clear expired btcb ends..." } + +func (s *CronService) ReloadBlockedFromBeingSearchedList(ctx context.Context, param *xxl.RunReq) (msg string) { + logger.Info("task %v param: %v log_id: %v", param.ExecutorHandler, param.ExecutorParams, xxl.Int64ToStr(param.LogID)) + logger.Info("Refreshing blocked-from-being-searched list cached in redis...") + + // 查找禁止在搜索被发现的主播 + list, err := _DefaultAccountPunishment.OpListByType(&gin.Context{}, consts.AccountPunishment_BlockFromBeingSearched) + if err != nil { + logger.Error("_DefaultAccountPunishment OpListByType fail, err: %v", err) + } + // 清缓存 + if err := redis.GetRedisClient().Del(consts.RedisStreamerPrefix + "blocked_from_being_searched_list"); err != nil { + logger.Error("Del redis cache fail, err: %v", err) + return fmt.Sprintf("Del redis cache fail, err: %v", err) + } + + // 加载缓存 + for _, acctpunishment := range list { + err := redis.GetRedisClient().RPush(consts.RedisStreamerPrefix+"blocked_from_being_searched_list", fmt.Sprint(acctpunishment.GetMid())) + if err != nil { + logger.Error("Redis cache fail, err: %v", err) + return fmt.Sprintf("Redis cache fail, err: %v", err) + } + } + + logger.Info("Refresh blocked-from-being-searched list cached in redis accomplished...") + return "Refresh blocked-from-being-searched list cached in redis accomplished" +}