From c9e6889aa2afb7c97f4e2c058cbbd3ed494e4f44 Mon Sep 17 00:00:00 2001 From: Leufolium Date: Thu, 30 May 2024 14:53:38 +0800 Subject: [PATCH] by Robin at 20240530 --- apollostruct/streamer_score_formula.go | 51 +++++ app/mix/cmd_offline/main.go | 4 + app/mix/service/logic/moment.go | 11 + app/mix/service/logic/vas.go | 4 + app/mix/service/logic/vas_zone.go | 5 + app/mix/service/logic/zonemoment.go | 11 + app/mix/service/service.go | 12 ++ app/mix/service/streamer_recomm_service.go | 226 +++++++++++++++++---- app/mix/service/xxljob_tasks.go | 19 +- bizcommon/util/util.go | 11 + 10 files changed, 301 insertions(+), 53 deletions(-) create mode 100644 apollostruct/streamer_score_formula.go diff --git a/apollostruct/streamer_score_formula.go b/apollostruct/streamer_score_formula.go new file mode 100644 index 00000000..5b2f16a5 --- /dev/null +++ b/apollostruct/streamer_score_formula.go @@ -0,0 +1,51 @@ +package apollostruct + +import "sort" + +type FormulaParam struct { + Param string `json:"param"` + Maximum float64 `json:"maximum"` + Proportion float64 `json:"proportion"` +} + +type ByProportion []*FormulaParam + +func (b ByProportion) Len() int { + return len(b) +} +func (b ByProportion) Less(i, j int) bool { + if b[i].Proportion < b[j].Proportion { + return true + } else { + return false + } +} +func (b ByProportion) Swap(i, j int) { + b[i], b[j] = b[j], b[i] +} + +type StreamerScoreFormulaCfg struct { + ZoneMomentCountInThreeDays *FormulaParam `json:"zone_moment_count_in_three_days"` + ZoneMomentCountInAMonth *FormulaParam `json:"zone_moment_count_in_a_month"` + IncomeInAWeek *FormulaParam `json:"income_in_a_week"` + NewZoneMemberCountInThreeDays *FormulaParam `json:"new_zone_member_count_in_three_days"` + NewZoneMemberCountInAMonth *FormulaParam `json:"new_zone_member_count_in_a_month"` + MomentCountInThreeDays *FormulaParam `json:"moment_count_in_three_days"` + + Priority []string +} + +func (s *StreamerScoreFormulaCfg) CalPriority() { + list := make(ByProportion, 0) + list = append(list, s.ZoneMomentCountInThreeDays) + list = append(list, s.ZoneMomentCountInAMonth) + list = append(list, s.IncomeInAWeek) + list = append(list, s.NewZoneMemberCountInThreeDays) + list = append(list, s.NewZoneMemberCountInAMonth) + list = append(list, s.MomentCountInThreeDays) + sort.Sort(list) + s.Priority = make([]string, 0) + for _, formula := range list { + s.Priority = append(s.Priority, formula.Param) + } +} diff --git a/app/mix/cmd_offline/main.go b/app/mix/cmd_offline/main.go index 711074e6..efb4a8c0 100644 --- a/app/mix/cmd_offline/main.go +++ b/app/mix/cmd_offline/main.go @@ -78,6 +78,7 @@ func main() { service.DefaultImageAuditTaskResultHandler = service.NewImageAuditTaskResultHandler() service.DefaultTextAuditTaskResultHandler = service.NewTextAuditTaskResultHandler() service.DefaultVideoModerationTaskResultHandler = service.NewVideoModerationTaskResultHandler() + service.DefaultStreamerRecommService = service.NewStreamerRecommService() err = service.DefaultService.Init(cfg) if err != nil { msg := fmt.Sprintf("Service init fail, err: %v", err) @@ -94,6 +95,9 @@ func main() { service.DefaultService.ConnectToTextAudit() service.DefaultService.ConnectToVideoModeration() + // 连接到推荐服务 + service.DefaultService.ConnectToStreamerRecommService(service.DefaultStreamerRecommService) + // 启动图像审核服务 imageaudit.Init(cfg.ImageAudit) diff --git a/app/mix/service/logic/moment.go b/app/mix/service/logic/moment.go index 38ac545a..0ce85953 100644 --- a/app/mix/service/logic/moment.go +++ b/app/mix/service/logic/moment.go @@ -166,3 +166,14 @@ func (p *Moment) GetMomentStringIdMapByIds(ctx *gin.Context, momentIdStrs []stri return mp, nil } + +func (p *Moment) GetMomentStatInfoList(ctx *gin.Context, st, et int64) ([]*dbstruct.MomentStatInfo, error) { + + list, err := p.store.GetMomentCountByTimeSpanGroupByMid(ctx, st, et) + if err != nil { + logger.Error("GetMomentCountByTimeSpanGroupByMid fail, err: %v", err) + return make([]*dbstruct.MomentStatInfo, 0), err + } + + return list, nil +} diff --git a/app/mix/service/logic/vas.go b/app/mix/service/logic/vas.go index be1b7d49..17636c44 100644 --- a/app/mix/service/logic/vas.go +++ b/app/mix/service/logic/vas.go @@ -3478,3 +3478,7 @@ func (v *Vas) GetIncomeList(ctx *gin.Context, tx *sqlx.Tx, mid, st, et int64) (l func (v *Vas) GetTotalFinishIncome(ctx *gin.Context, tx *sqlx.Tx, mid int64, orderIds []string) (int64, error) { return v.store.GetTotalFinishIncome(ctx, tx, mid, orderIds) } + +func (v *Vas) GetIncomeByTimeSpanGroupByMid(ctx *gin.Context, tx *sqlx.Tx, st, et int64) ([]*dbstruct.StreamerProfit, error) { + return v.store.GetIncomeByTimeSpanGroupByMid(ctx, tx, st, et) +} diff --git a/app/mix/service/logic/vas_zone.go b/app/mix/service/logic/vas_zone.go index 1cc584c5..ba53d1d6 100644 --- a/app/mix/service/logic/vas_zone.go +++ b/app/mix/service/logic/vas_zone.go @@ -1224,3 +1224,8 @@ func (v *Vas) GetLastHourZoneProfit(ctx *gin.Context, st, et int64) ([]*dbstruct return zoneprofits, zonerefunds, nil } + +// 统计时段内空间进入人数 +func (v *Vas) GetLastHourZoneAdmissionInfo(ctx *gin.Context, tx *sqlx.Tx, st, et int64) ([]*dbstruct.ZoneAdmissionInfo, error) { + return v.store.GetLastHourZoneAdmissionInfo(ctx, tx, st, et) +} diff --git a/app/mix/service/logic/zonemoment.go b/app/mix/service/logic/zonemoment.go index dcaabd91..cf967e00 100644 --- a/app/mix/service/logic/zonemoment.go +++ b/app/mix/service/logic/zonemoment.go @@ -173,3 +173,14 @@ func (p *ZoneMoment) OpCountByMidAndStatus(ctx *gin.Context, mid int64, status i } return count, nil } + +func (p *ZoneMoment) GetZoneMomentStatInfoList(ctx *gin.Context, st, et int64) ([]*dbstruct.ZoneMomentStatInfo, error) { + + list, err := p.store.GetZoneMomentCountByTimeSpanGroupByMid(ctx, st, et) + if err != nil { + logger.Error("GetMomentCountByTimeSpanGroupByMid fail, err: %v", err) + return make([]*dbstruct.ZoneMomentStatInfo, 0), err + } + + return list, nil +} diff --git a/app/mix/service/service.go b/app/mix/service/service.go index 01a5afad..59387bf5 100644 --- a/app/mix/service/service.go +++ b/app/mix/service/service.go @@ -260,6 +260,18 @@ func (s *Service) ConnectToVideoModeration() { videomoderation.ConnectToContentAuditRTIService(_DefaultContentAuditRTI) } +// 推荐服务数据库接口 +func (s *Service) ConnectToStreamerRecommService(r *StreamerRecommService) { + r.SetStreamerRecommDbService(_DefaultMoment, _DefaultZoneMoment, _DefaultVas) + r.SetOut(func(mids []int64) error { + err := redis.GetRedisClient().Set(consts.RedisStreamerPrefix+"recomm_list", mids, 0) + if err != nil { + logger.Info("redis cache fail: %v", err) + } + return err + }) +} + // Product func (s *Service) OpCreateProduct(ctx *gin.Context, req *productproto.OpCreateReq) (ec errcode.ErrCode) { ec = errcode.ErrCodeProductSrvOk diff --git a/app/mix/service/streamer_recomm_service.go b/app/mix/service/streamer_recomm_service.go index f02296e1..aee77676 100644 --- a/app/mix/service/streamer_recomm_service.go +++ b/app/mix/service/streamer_recomm_service.go @@ -1,36 +1,100 @@ package service import ( - "service/dbstruct" + "fmt" + "math" + "service/api/consts" + "service/apollostruct" + "service/app/mix/service/logic" + "service/library/apollo" "service/library/logger" + "sort" "github.com/gin-gonic/gin" ) -type StreamerScore struct { - Mid int64 - ZoneMomentCountInThreeDays int64 - ZoneMomentCountInAMonth int64 - IncomeInAWeek int64 - NewZoneMemberCountInThreeDays int64 - NewZoneMemberCountInAMonth int64 - MomentCountInThreeDays int64 - RefundRate float64 - Score int64 -} +var ( + DefaultStreamerRecommService *StreamerRecommService +) type StreamerRecommService struct { - RecvZoneMomentStatInfo func(ctx *gin.Context, st, et int64) ([]*dbstruct.ZoneMomentStatInfo, error) - RecvMomentStatInfo func(ctx *gin.Context, st, et int64) ([]*dbstruct.MomentStatInfo, error) - RecvStreamerProfit func(ctx *gin.Context, st, et int64) ([]*dbstruct.StreamerProfit, error) - RecvZoneAdmissionInfo func(ctx *gin.Context, st, et int64) ([]*dbstruct.ZoneAdmissionInfo, error) + momentService *logic.Moment + zoneMomentService *logic.ZoneMoment + vasService *logic.Vas + out func([]int64) error - scores map[int64]*StreamerScore + formula *apollostruct.StreamerScoreFormulaCfg + + scores map[int64]*StreamerScore + scorelist ByScore +} + +func NewStreamerRecommService() *StreamerRecommService { + return new(StreamerRecommService) +} + +func (s *StreamerRecommService) SetStreamerRecommDbService(moment *logic.Moment, zonemoment *logic.ZoneMoment, vas *logic.Vas) { + s.momentService = moment + s.zoneMomentService = zonemoment + s.vasService = vas +} + +func (s *StreamerRecommService) SetOut(out func([]int64) error) { + s.out = out } // 启动跑批 -func (s *StreamerRecommService) RunBatch() { +func (s *StreamerRecommService) RunBatch(nt int64) error { + ctx := &gin.Context{} + // 获取计算公式 + s.formula = &apollostruct.StreamerScoreFormulaCfg{} + err := apollo.GetJson(consts.ReferentialZoneMomentKey, s.formula, apollo.ApolloOpts().SetNamespace("application")) + if err != nil { + logger.Error("Apollo read failed : %v", err) + s.clear() + return fmt.Errorf("apollo read failed : %v", err) + } + // 获取优先级 + s.formula.CalPriority() + + // 取数 + err = s.recvMsg(ctx, nt) + if err != nil { + logger.Error("recvMsg fail, err: %v", err) + s.clear() + return err + } + + // 打分 + err = s.score() + if err != nil { + logger.Error("score fail, err: %v", err) + s.clear() + return err + } + + // 排序 + s.sort() + + // 推数 + err = s.push() + if err != nil { + logger.Error("push fail, err: %v", err) + s.clear() + return err + } + + // 清空 + s.clear() + + return nil + +} + +func (s *StreamerRecommService) clear() { + s.scorelist = make([]*StreamerScore, 0) + s.scores = make(map[int64]*StreamerScore) } // 取数 @@ -55,9 +119,9 @@ func (s *StreamerRecommService) recvMsg(ctx *gin.Context, nt int64) (err error) } // 七日内消费金额 - err = s.recvStreamerProfit(ctx, beforeAWeek, nt) + err = s.recvStreamerProfitInAWeek(ctx, beforeAWeek, nt) if err != nil { - logger.Error("recvStreamerProfit fail, err: %v", err) + logger.Error("recvStreamerProfitInAWeek fail, err: %v", err) return } @@ -85,27 +149,40 @@ func (s *StreamerRecommService) recvMsg(ctx *gin.Context, nt int64) (err error) return } -// 运算 -func (s *ScriptsService) score() { +// 打分 +func (s *StreamerRecommService) score() error { + for _, v := range s.scores { + v.CalScore(s.formula) + v.SetPriority(s.formula.Priority) + } + return nil +} +// 排序 +func (s *StreamerRecommService) sort() { + sort.Sort(s.scorelist) } // 转存 -func (s *ScriptsService) save() { +func (s *StreamerRecommService) save() { } // 推送 -func (s *ScriptsService) push() { - +func (s *StreamerRecommService) push() error { + list := make([]int64, 0) + for _, v := range s.scorelist { + list = append(list, v.Mid) + } + return s.out(list) } // 取数 func (s *StreamerRecommService) recvZoneMomentStatInfoInThreeDays(ctx *gin.Context, st, et int64) (err error) { - zoneMomentStatInfoList, err := s.RecvZoneMomentStatInfo(ctx, st, et) + zoneMomentStatInfoList, err := s.zoneMomentService.GetZoneMomentStatInfoList(ctx, st, et) if err != nil { - logger.Error("RecvZoneMomentStatInfo in three days fail, err: %v", err) + logger.Error("GetZoneMomentStatInfoList in three days fail, err: %v", err) return } @@ -123,9 +200,9 @@ func (s *StreamerRecommService) recvZoneMomentStatInfoInThreeDays(ctx *gin.Conte func (s *StreamerRecommService) recvZoneMomentStatInfoInAMonth(ctx *gin.Context, st, et int64) (err error) { - zoneMomentStatInfoList, err := s.RecvZoneMomentStatInfo(ctx, st, et) + zoneMomentStatInfoList, err := s.zoneMomentService.GetZoneMomentStatInfoList(ctx, st, et) if err != nil { - logger.Error("RecvZoneMomentStatInfo in three days fail, err: %v", err) + logger.Error("GetZoneMomentStatInfoList in three days fail, err: %v", err) return } @@ -141,11 +218,11 @@ func (s *StreamerRecommService) recvZoneMomentStatInfoInAMonth(ctx *gin.Context, return } -func (s *StreamerRecommService) recvStreamerProfit(ctx *gin.Context, st, et int64) (err error) { +func (s *StreamerRecommService) recvStreamerProfitInAWeek(ctx *gin.Context, st, et int64) (err error) { - streamerProfitList, err := s.RecvStreamerProfit(ctx, st, et) + streamerProfitList, err := s.vasService.GetIncomeByTimeSpanGroupByMid(ctx, nil, st, et) if err != nil { - logger.Error("RecvStreamerProfit fail, err: %v", err) + logger.Error("GetIncomeByTimeSpanGroupByMid fail, err: %v", err) return } @@ -163,9 +240,9 @@ func (s *StreamerRecommService) recvStreamerProfit(ctx *gin.Context, st, et int6 func (s *StreamerRecommService) recvZoneAdmissionInfoInThreeDays(ctx *gin.Context, st, et int64) (err error) { - zoneAdmissionInfoList, err := s.RecvZoneAdmissionInfo(ctx, st, et) + zoneAdmissionInfoList, err := s.vasService.GetLastHourZoneAdmissionInfo(ctx, nil, st, et) if err != nil { - logger.Error("RecvZoneAdmissionInfo in three days fail, err: %v", err) + logger.Error("GetLastHourZoneAdmissionInfo in three days fail, err: %v", err) return } @@ -183,9 +260,9 @@ func (s *StreamerRecommService) recvZoneAdmissionInfoInThreeDays(ctx *gin.Contex func (s *StreamerRecommService) recvZoneAdmissionInfoInAMonth(ctx *gin.Context, st, et int64) (err error) { - zoneAdmissionInfoList, err := s.RecvZoneAdmissionInfo(ctx, st, et) + zoneAdmissionInfoList, err := s.vasService.GetLastHourZoneAdmissionInfo(ctx, nil, st, et) if err != nil { - logger.Error("RecvZoneAdmissionInfo in three days fail, err: %v", err) + logger.Error("GetLastHourZoneAdmissionInfo in three days fail, err: %v", err) return } @@ -203,9 +280,9 @@ func (s *StreamerRecommService) recvZoneAdmissionInfoInAMonth(ctx *gin.Context, func (s *StreamerRecommService) recvMomentStatInfoInThreeDays(ctx *gin.Context, st, et int64) (err error) { - momentStatInfoList, err := s.RecvMomentStatInfo(ctx, st, et) + momentStatInfoList, err := s.momentService.GetMomentStatInfoList(ctx, st, et) if err != nil { - logger.Error("RecvMomentStatInfo in three days fail, err: %v", err) + logger.Error("GetMomentStatInfoList in three days fail, err: %v", err) return } @@ -220,3 +297,76 @@ func (s *StreamerRecommService) recvMomentStatInfoInThreeDays(ctx *gin.Context, } return } + +/* + *@author Robin + *@class 主播打分 + */ +type StreamerScore struct { + Mid int64 + ZoneMomentCountInThreeDays int64 + ZoneMomentCountInAMonth int64 + IncomeInAWeek int64 + NewZoneMemberCountInThreeDays int64 + NewZoneMemberCountInAMonth int64 + MomentCountInThreeDays int64 + RefundRate float64 + Score float64 + + Priorities []float64 +} + +func (s *StreamerScore) SetPriority(names []string) { + s.Priorities = make([]float64, 0) + s.Priorities = append(s.Priorities, s.Score) + for _, name := range names { + switch name { + case "zone_moment_count_in_three_days": + s.Priorities = append(s.Priorities, float64(s.ZoneMomentCountInThreeDays)) + case "zone_moment_count_in_a_month": + s.Priorities = append(s.Priorities, float64(s.ZoneMomentCountInAMonth)) + case "income_in_a_week": + s.Priorities = append(s.Priorities, float64(s.IncomeInAWeek)) + case "new_zone_member_count_in_three_days": + s.Priorities = append(s.Priorities, float64(s.NewZoneMemberCountInThreeDays)) + case "new_zone_member_count_in_a_month": + s.Priorities = append(s.Priorities, float64(s.NewZoneMemberCountInAMonth)) + case "moment_count_in_three_days": + s.Priorities = append(s.Priorities, float64(s.MomentCountInThreeDays)) + } + } +} + +func (s *StreamerScore) CalScore(formula *apollostruct.StreamerScoreFormulaCfg) { + s.Score += math.Min(100, float64(s.ZoneMomentCountInThreeDays)/formula.ZoneMomentCountInThreeDays.Maximum*100) * formula.ZoneMomentCountInThreeDays.Proportion + s.Score += math.Min(100, float64(s.ZoneMomentCountInAMonth)/formula.ZoneMomentCountInAMonth.Maximum*100) * formula.ZoneMomentCountInAMonth.Proportion + s.Score += math.Min(100, float64(s.IncomeInAWeek)/formula.IncomeInAWeek.Maximum*100) * formula.IncomeInAWeek.Proportion + s.Score += math.Min(100, float64(s.NewZoneMemberCountInThreeDays)/formula.NewZoneMemberCountInThreeDays.Maximum*100) * formula.NewZoneMemberCountInThreeDays.Proportion + s.Score += math.Min(100, float64(s.NewZoneMemberCountInAMonth)/formula.NewZoneMemberCountInAMonth.Maximum*100) * formula.NewZoneMemberCountInAMonth.Proportion + s.Score += math.Min(100, float64(s.MomentCountInThreeDays)/formula.MomentCountInThreeDays.Maximum*100) * formula.MomentCountInThreeDays.Proportion +} + +type ByScore []*StreamerScore + +func (b ByScore) Len() int { + return len(b) +} +func (b ByScore) Less(i, j int) bool { + range1 := len(b[i].Priorities) + range2 := len(b[j].Priorities) + minRange := range1 + if range1 > range2 { + minRange = range2 + } + for k := 0; k < minRange; k++ { + if b[i].Priorities[k] < b[j].Priorities[k] { + return true + } else if b[i].Priorities[k] > b[j].Priorities[k] { + return false + } + } + return false +} +func (b ByScore) Swap(i, j int) { + b[i], b[j] = b[j], b[i] +} diff --git a/app/mix/service/xxljob_tasks.go b/app/mix/service/xxljob_tasks.go index 77df357b..8df27bad 100644 --- a/app/mix/service/xxljob_tasks.go +++ b/app/mix/service/xxljob_tasks.go @@ -10,7 +10,6 @@ import ( daily_statementproto "service/api/proto/daily_statement/proto" daily_statement_zone_infoproto "service/api/proto/daily_statement_zone_info/proto" momentproto "service/api/proto/moment/proto" - streamerproto "service/api/proto/streamer/proto" vasproto "service/api/proto/vas/proto" vericodeproto "service/api/proto/vericode/proto" "service/apollostruct" @@ -35,22 +34,12 @@ import ( func (s *CronService) ReloadRecommList(ctx context.Context, param *xxl.RunReq) (msg string) { logger.Info("task %v param: %v log_id: %v", param.ExecutorHandler, param.ExecutorParams, xxl.Int64ToStr(param.LogID)) logger.Info("Refreshing recommendation list cached in redis...") - list, err := _DefaultStreamer.OpList(&gin.Context{}, &streamerproto.OpListReq{ - Sort: []string{"-fans"}, - }) - if err != nil { - logger.Error("OpList fail, err: %v", err) - return fmt.Sprintf("OpList fail, err: %v", err) - } - midList := make([]int64, len(list)) - for i, streamer := range list { - midList[i] = util.DerefInt64(streamer.Mid) - } - err = redis.GetRedisClient().Set(consts.RedisStreamerPrefix+"recomm_list", midList, 0) + nt := util.GetHourHalfTimeStamp(time.Now()) + err := DefaultStreamerRecommService.RunBatch(nt) if err != nil { - logger.Error("Redis cache fail, err: %v", err) - return fmt.Sprintf("Redis cache fail, err: %v", err) + logger.Error("RunBatch fail, err: %v", err) + return fmt.Sprintf("RunBatch fail, err: %v", err) } logger.Info("Refresh recommendation list cached in redis accomplished...") return "Refresh recommendation list cached in redis accomplished" diff --git a/bizcommon/util/util.go b/bizcommon/util/util.go index bc694380..4604f20e 100644 --- a/bizcommon/util/util.go +++ b/bizcommon/util/util.go @@ -90,6 +90,17 @@ func GetHourStartTimeStamp(t time.Time) int64 { return duetimecst.Unix() } +// 获取30分时间戳 +func GetHourHalfTimeStamp(t time.Time) int64 { + loc, _ := time.LoadLocation("Asia/Shanghai") + timeStr := fmt.Sprintf("%02d-%02d-%02d %02d:30:00", t.Year(), t.Month(), t.Day(), t.Hour()) + duetimecst, err := time.ParseInLocation("2006-1-2 15:04:05", timeStr, loc) + if err != nil { + logger.Error("parse error : %v", err) + } + return duetimecst.Unix() +} + // 获取整分时间戳 func GetMinuteStartTimeStamp(t time.Time) int64 { loc, _ := time.LoadLocation("Asia/Shanghai")