by Robin at 20240530

This commit is contained in:
Leufolium 2024-05-30 14:53:38 +08:00
parent d9d60f69a6
commit c9e6889aa2
10 changed files with 301 additions and 53 deletions

View File

@ -0,0 +1,51 @@
package apollostruct
import "sort"
type FormulaParam struct {
Param string `json:"param"`
Maximum float64 `json:"maximum"`
Proportion float64 `json:"proportion"`
}
type ByProportion []*FormulaParam
func (b ByProportion) Len() int {
return len(b)
}
func (b ByProportion) Less(i, j int) bool {
if b[i].Proportion < b[j].Proportion {
return true
} else {
return false
}
}
func (b ByProportion) Swap(i, j int) {
b[i], b[j] = b[j], b[i]
}
type StreamerScoreFormulaCfg struct {
ZoneMomentCountInThreeDays *FormulaParam `json:"zone_moment_count_in_three_days"`
ZoneMomentCountInAMonth *FormulaParam `json:"zone_moment_count_in_a_month"`
IncomeInAWeek *FormulaParam `json:"income_in_a_week"`
NewZoneMemberCountInThreeDays *FormulaParam `json:"new_zone_member_count_in_three_days"`
NewZoneMemberCountInAMonth *FormulaParam `json:"new_zone_member_count_in_a_month"`
MomentCountInThreeDays *FormulaParam `json:"moment_count_in_three_days"`
Priority []string
}
func (s *StreamerScoreFormulaCfg) CalPriority() {
list := make(ByProportion, 0)
list = append(list, s.ZoneMomentCountInThreeDays)
list = append(list, s.ZoneMomentCountInAMonth)
list = append(list, s.IncomeInAWeek)
list = append(list, s.NewZoneMemberCountInThreeDays)
list = append(list, s.NewZoneMemberCountInAMonth)
list = append(list, s.MomentCountInThreeDays)
sort.Sort(list)
s.Priority = make([]string, 0)
for _, formula := range list {
s.Priority = append(s.Priority, formula.Param)
}
}

View File

@ -78,6 +78,7 @@ func main() {
service.DefaultImageAuditTaskResultHandler = service.NewImageAuditTaskResultHandler()
service.DefaultTextAuditTaskResultHandler = service.NewTextAuditTaskResultHandler()
service.DefaultVideoModerationTaskResultHandler = service.NewVideoModerationTaskResultHandler()
service.DefaultStreamerRecommService = service.NewStreamerRecommService()
err = service.DefaultService.Init(cfg)
if err != nil {
msg := fmt.Sprintf("Service init fail, err: %v", err)
@ -94,6 +95,9 @@ func main() {
service.DefaultService.ConnectToTextAudit()
service.DefaultService.ConnectToVideoModeration()
// 连接到推荐服务
service.DefaultService.ConnectToStreamerRecommService(service.DefaultStreamerRecommService)
// 启动图像审核服务
imageaudit.Init(cfg.ImageAudit)

View File

@ -166,3 +166,14 @@ func (p *Moment) GetMomentStringIdMapByIds(ctx *gin.Context, momentIdStrs []stri
return mp, nil
}
func (p *Moment) GetMomentStatInfoList(ctx *gin.Context, st, et int64) ([]*dbstruct.MomentStatInfo, error) {
list, err := p.store.GetMomentCountByTimeSpanGroupByMid(ctx, st, et)
if err != nil {
logger.Error("GetMomentCountByTimeSpanGroupByMid fail, err: %v", err)
return make([]*dbstruct.MomentStatInfo, 0), err
}
return list, nil
}

View File

@ -3478,3 +3478,7 @@ func (v *Vas) GetIncomeList(ctx *gin.Context, tx *sqlx.Tx, mid, st, et int64) (l
func (v *Vas) GetTotalFinishIncome(ctx *gin.Context, tx *sqlx.Tx, mid int64, orderIds []string) (int64, error) {
return v.store.GetTotalFinishIncome(ctx, tx, mid, orderIds)
}
func (v *Vas) GetIncomeByTimeSpanGroupByMid(ctx *gin.Context, tx *sqlx.Tx, st, et int64) ([]*dbstruct.StreamerProfit, error) {
return v.store.GetIncomeByTimeSpanGroupByMid(ctx, tx, st, et)
}

View File

@ -1224,3 +1224,8 @@ func (v *Vas) GetLastHourZoneProfit(ctx *gin.Context, st, et int64) ([]*dbstruct
return zoneprofits, zonerefunds, nil
}
// 统计时段内空间进入人数
func (v *Vas) GetLastHourZoneAdmissionInfo(ctx *gin.Context, tx *sqlx.Tx, st, et int64) ([]*dbstruct.ZoneAdmissionInfo, error) {
return v.store.GetLastHourZoneAdmissionInfo(ctx, tx, st, et)
}

View File

@ -173,3 +173,14 @@ func (p *ZoneMoment) OpCountByMidAndStatus(ctx *gin.Context, mid int64, status i
}
return count, nil
}
func (p *ZoneMoment) GetZoneMomentStatInfoList(ctx *gin.Context, st, et int64) ([]*dbstruct.ZoneMomentStatInfo, error) {
list, err := p.store.GetZoneMomentCountByTimeSpanGroupByMid(ctx, st, et)
if err != nil {
logger.Error("GetMomentCountByTimeSpanGroupByMid fail, err: %v", err)
return make([]*dbstruct.ZoneMomentStatInfo, 0), err
}
return list, nil
}

View File

@ -260,6 +260,18 @@ func (s *Service) ConnectToVideoModeration() {
videomoderation.ConnectToContentAuditRTIService(_DefaultContentAuditRTI)
}
// 推荐服务数据库接口
func (s *Service) ConnectToStreamerRecommService(r *StreamerRecommService) {
r.SetStreamerRecommDbService(_DefaultMoment, _DefaultZoneMoment, _DefaultVas)
r.SetOut(func(mids []int64) error {
err := redis.GetRedisClient().Set(consts.RedisStreamerPrefix+"recomm_list", mids, 0)
if err != nil {
logger.Info("redis cache fail: %v", err)
}
return err
})
}
// Product
func (s *Service) OpCreateProduct(ctx *gin.Context, req *productproto.OpCreateReq) (ec errcode.ErrCode) {
ec = errcode.ErrCodeProductSrvOk

View File

@ -1,36 +1,100 @@
package service
import (
"service/dbstruct"
"fmt"
"math"
"service/api/consts"
"service/apollostruct"
"service/app/mix/service/logic"
"service/library/apollo"
"service/library/logger"
"sort"
"github.com/gin-gonic/gin"
)
type StreamerScore struct {
Mid int64
ZoneMomentCountInThreeDays int64
ZoneMomentCountInAMonth int64
IncomeInAWeek int64
NewZoneMemberCountInThreeDays int64
NewZoneMemberCountInAMonth int64
MomentCountInThreeDays int64
RefundRate float64
Score int64
}
var (
DefaultStreamerRecommService *StreamerRecommService
)
type StreamerRecommService struct {
RecvZoneMomentStatInfo func(ctx *gin.Context, st, et int64) ([]*dbstruct.ZoneMomentStatInfo, error)
RecvMomentStatInfo func(ctx *gin.Context, st, et int64) ([]*dbstruct.MomentStatInfo, error)
RecvStreamerProfit func(ctx *gin.Context, st, et int64) ([]*dbstruct.StreamerProfit, error)
RecvZoneAdmissionInfo func(ctx *gin.Context, st, et int64) ([]*dbstruct.ZoneAdmissionInfo, error)
momentService *logic.Moment
zoneMomentService *logic.ZoneMoment
vasService *logic.Vas
out func([]int64) error
formula *apollostruct.StreamerScoreFormulaCfg
scores map[int64]*StreamerScore
scorelist ByScore
}
func NewStreamerRecommService() *StreamerRecommService {
return new(StreamerRecommService)
}
func (s *StreamerRecommService) SetStreamerRecommDbService(moment *logic.Moment, zonemoment *logic.ZoneMoment, vas *logic.Vas) {
s.momentService = moment
s.zoneMomentService = zonemoment
s.vasService = vas
}
func (s *StreamerRecommService) SetOut(out func([]int64) error) {
s.out = out
}
// 启动跑批
func (s *StreamerRecommService) RunBatch() {
func (s *StreamerRecommService) RunBatch(nt int64) error {
ctx := &gin.Context{}
// 获取计算公式
s.formula = &apollostruct.StreamerScoreFormulaCfg{}
err := apollo.GetJson(consts.ReferentialZoneMomentKey, s.formula, apollo.ApolloOpts().SetNamespace("application"))
if err != nil {
logger.Error("Apollo read failed : %v", err)
s.clear()
return fmt.Errorf("apollo read failed : %v", err)
}
// 获取优先级
s.formula.CalPriority()
// 取数
err = s.recvMsg(ctx, nt)
if err != nil {
logger.Error("recvMsg fail, err: %v", err)
s.clear()
return err
}
// 打分
err = s.score()
if err != nil {
logger.Error("score fail, err: %v", err)
s.clear()
return err
}
// 排序
s.sort()
// 推数
err = s.push()
if err != nil {
logger.Error("push fail, err: %v", err)
s.clear()
return err
}
// 清空
s.clear()
return nil
}
func (s *StreamerRecommService) clear() {
s.scorelist = make([]*StreamerScore, 0)
s.scores = make(map[int64]*StreamerScore)
}
// 取数
@ -55,9 +119,9 @@ func (s *StreamerRecommService) recvMsg(ctx *gin.Context, nt int64) (err error)
}
// 七日内消费金额
err = s.recvStreamerProfit(ctx, beforeAWeek, nt)
err = s.recvStreamerProfitInAWeek(ctx, beforeAWeek, nt)
if err != nil {
logger.Error("recvStreamerProfit fail, err: %v", err)
logger.Error("recvStreamerProfitInAWeek fail, err: %v", err)
return
}
@ -85,27 +149,40 @@ func (s *StreamerRecommService) recvMsg(ctx *gin.Context, nt int64) (err error)
return
}
// 运算
func (s *ScriptsService) score() {
// 打分
func (s *StreamerRecommService) score() error {
for _, v := range s.scores {
v.CalScore(s.formula)
v.SetPriority(s.formula.Priority)
}
return nil
}
// 排序
func (s *StreamerRecommService) sort() {
sort.Sort(s.scorelist)
}
// 转存
func (s *ScriptsService) save() {
func (s *StreamerRecommService) save() {
}
// 推送
func (s *ScriptsService) push() {
func (s *StreamerRecommService) push() error {
list := make([]int64, 0)
for _, v := range s.scorelist {
list = append(list, v.Mid)
}
return s.out(list)
}
// 取数
func (s *StreamerRecommService) recvZoneMomentStatInfoInThreeDays(ctx *gin.Context, st, et int64) (err error) {
zoneMomentStatInfoList, err := s.RecvZoneMomentStatInfo(ctx, st, et)
zoneMomentStatInfoList, err := s.zoneMomentService.GetZoneMomentStatInfoList(ctx, st, et)
if err != nil {
logger.Error("RecvZoneMomentStatInfo in three days fail, err: %v", err)
logger.Error("GetZoneMomentStatInfoList in three days fail, err: %v", err)
return
}
@ -123,9 +200,9 @@ func (s *StreamerRecommService) recvZoneMomentStatInfoInThreeDays(ctx *gin.Conte
func (s *StreamerRecommService) recvZoneMomentStatInfoInAMonth(ctx *gin.Context, st, et int64) (err error) {
zoneMomentStatInfoList, err := s.RecvZoneMomentStatInfo(ctx, st, et)
zoneMomentStatInfoList, err := s.zoneMomentService.GetZoneMomentStatInfoList(ctx, st, et)
if err != nil {
logger.Error("RecvZoneMomentStatInfo in three days fail, err: %v", err)
logger.Error("GetZoneMomentStatInfoList in three days fail, err: %v", err)
return
}
@ -141,11 +218,11 @@ func (s *StreamerRecommService) recvZoneMomentStatInfoInAMonth(ctx *gin.Context,
return
}
func (s *StreamerRecommService) recvStreamerProfit(ctx *gin.Context, st, et int64) (err error) {
func (s *StreamerRecommService) recvStreamerProfitInAWeek(ctx *gin.Context, st, et int64) (err error) {
streamerProfitList, err := s.RecvStreamerProfit(ctx, st, et)
streamerProfitList, err := s.vasService.GetIncomeByTimeSpanGroupByMid(ctx, nil, st, et)
if err != nil {
logger.Error("RecvStreamerProfit fail, err: %v", err)
logger.Error("GetIncomeByTimeSpanGroupByMid fail, err: %v", err)
return
}
@ -163,9 +240,9 @@ func (s *StreamerRecommService) recvStreamerProfit(ctx *gin.Context, st, et int6
func (s *StreamerRecommService) recvZoneAdmissionInfoInThreeDays(ctx *gin.Context, st, et int64) (err error) {
zoneAdmissionInfoList, err := s.RecvZoneAdmissionInfo(ctx, st, et)
zoneAdmissionInfoList, err := s.vasService.GetLastHourZoneAdmissionInfo(ctx, nil, st, et)
if err != nil {
logger.Error("RecvZoneAdmissionInfo in three days fail, err: %v", err)
logger.Error("GetLastHourZoneAdmissionInfo in three days fail, err: %v", err)
return
}
@ -183,9 +260,9 @@ func (s *StreamerRecommService) recvZoneAdmissionInfoInThreeDays(ctx *gin.Contex
func (s *StreamerRecommService) recvZoneAdmissionInfoInAMonth(ctx *gin.Context, st, et int64) (err error) {
zoneAdmissionInfoList, err := s.RecvZoneAdmissionInfo(ctx, st, et)
zoneAdmissionInfoList, err := s.vasService.GetLastHourZoneAdmissionInfo(ctx, nil, st, et)
if err != nil {
logger.Error("RecvZoneAdmissionInfo in three days fail, err: %v", err)
logger.Error("GetLastHourZoneAdmissionInfo in three days fail, err: %v", err)
return
}
@ -203,9 +280,9 @@ func (s *StreamerRecommService) recvZoneAdmissionInfoInAMonth(ctx *gin.Context,
func (s *StreamerRecommService) recvMomentStatInfoInThreeDays(ctx *gin.Context, st, et int64) (err error) {
momentStatInfoList, err := s.RecvMomentStatInfo(ctx, st, et)
momentStatInfoList, err := s.momentService.GetMomentStatInfoList(ctx, st, et)
if err != nil {
logger.Error("RecvMomentStatInfo in three days fail, err: %v", err)
logger.Error("GetMomentStatInfoList in three days fail, err: %v", err)
return
}
@ -220,3 +297,76 @@ func (s *StreamerRecommService) recvMomentStatInfoInThreeDays(ctx *gin.Context,
}
return
}
/*
*@author Robin
*@class 主播打分
*/
type StreamerScore struct {
Mid int64
ZoneMomentCountInThreeDays int64
ZoneMomentCountInAMonth int64
IncomeInAWeek int64
NewZoneMemberCountInThreeDays int64
NewZoneMemberCountInAMonth int64
MomentCountInThreeDays int64
RefundRate float64
Score float64
Priorities []float64
}
func (s *StreamerScore) SetPriority(names []string) {
s.Priorities = make([]float64, 0)
s.Priorities = append(s.Priorities, s.Score)
for _, name := range names {
switch name {
case "zone_moment_count_in_three_days":
s.Priorities = append(s.Priorities, float64(s.ZoneMomentCountInThreeDays))
case "zone_moment_count_in_a_month":
s.Priorities = append(s.Priorities, float64(s.ZoneMomentCountInAMonth))
case "income_in_a_week":
s.Priorities = append(s.Priorities, float64(s.IncomeInAWeek))
case "new_zone_member_count_in_three_days":
s.Priorities = append(s.Priorities, float64(s.NewZoneMemberCountInThreeDays))
case "new_zone_member_count_in_a_month":
s.Priorities = append(s.Priorities, float64(s.NewZoneMemberCountInAMonth))
case "moment_count_in_three_days":
s.Priorities = append(s.Priorities, float64(s.MomentCountInThreeDays))
}
}
}
func (s *StreamerScore) CalScore(formula *apollostruct.StreamerScoreFormulaCfg) {
s.Score += math.Min(100, float64(s.ZoneMomentCountInThreeDays)/formula.ZoneMomentCountInThreeDays.Maximum*100) * formula.ZoneMomentCountInThreeDays.Proportion
s.Score += math.Min(100, float64(s.ZoneMomentCountInAMonth)/formula.ZoneMomentCountInAMonth.Maximum*100) * formula.ZoneMomentCountInAMonth.Proportion
s.Score += math.Min(100, float64(s.IncomeInAWeek)/formula.IncomeInAWeek.Maximum*100) * formula.IncomeInAWeek.Proportion
s.Score += math.Min(100, float64(s.NewZoneMemberCountInThreeDays)/formula.NewZoneMemberCountInThreeDays.Maximum*100) * formula.NewZoneMemberCountInThreeDays.Proportion
s.Score += math.Min(100, float64(s.NewZoneMemberCountInAMonth)/formula.NewZoneMemberCountInAMonth.Maximum*100) * formula.NewZoneMemberCountInAMonth.Proportion
s.Score += math.Min(100, float64(s.MomentCountInThreeDays)/formula.MomentCountInThreeDays.Maximum*100) * formula.MomentCountInThreeDays.Proportion
}
type ByScore []*StreamerScore
func (b ByScore) Len() int {
return len(b)
}
func (b ByScore) Less(i, j int) bool {
range1 := len(b[i].Priorities)
range2 := len(b[j].Priorities)
minRange := range1
if range1 > range2 {
minRange = range2
}
for k := 0; k < minRange; k++ {
if b[i].Priorities[k] < b[j].Priorities[k] {
return true
} else if b[i].Priorities[k] > b[j].Priorities[k] {
return false
}
}
return false
}
func (b ByScore) Swap(i, j int) {
b[i], b[j] = b[j], b[i]
}

View File

@ -10,7 +10,6 @@ import (
daily_statementproto "service/api/proto/daily_statement/proto"
daily_statement_zone_infoproto "service/api/proto/daily_statement_zone_info/proto"
momentproto "service/api/proto/moment/proto"
streamerproto "service/api/proto/streamer/proto"
vasproto "service/api/proto/vas/proto"
vericodeproto "service/api/proto/vericode/proto"
"service/apollostruct"
@ -35,22 +34,12 @@ import (
func (s *CronService) ReloadRecommList(ctx context.Context, param *xxl.RunReq) (msg string) {
logger.Info("task %v param: %v log_id: %v", param.ExecutorHandler, param.ExecutorParams, xxl.Int64ToStr(param.LogID))
logger.Info("Refreshing recommendation list cached in redis...")
list, err := _DefaultStreamer.OpList(&gin.Context{}, &streamerproto.OpListReq{
Sort: []string{"-fans"},
})
if err != nil {
logger.Error("OpList fail, err: %v", err)
return fmt.Sprintf("OpList fail, err: %v", err)
}
midList := make([]int64, len(list))
for i, streamer := range list {
midList[i] = util.DerefInt64(streamer.Mid)
}
err = redis.GetRedisClient().Set(consts.RedisStreamerPrefix+"recomm_list", midList, 0)
nt := util.GetHourHalfTimeStamp(time.Now())
err := DefaultStreamerRecommService.RunBatch(nt)
if err != nil {
logger.Error("Redis cache fail, err: %v", err)
return fmt.Sprintf("Redis cache fail, err: %v", err)
logger.Error("RunBatch fail, err: %v", err)
return fmt.Sprintf("RunBatch fail, err: %v", err)
}
logger.Info("Refresh recommendation list cached in redis accomplished...")
return "Refresh recommendation list cached in redis accomplished"

View File

@ -90,6 +90,17 @@ func GetHourStartTimeStamp(t time.Time) int64 {
return duetimecst.Unix()
}
// 获取30分时间戳
func GetHourHalfTimeStamp(t time.Time) int64 {
loc, _ := time.LoadLocation("Asia/Shanghai")
timeStr := fmt.Sprintf("%02d-%02d-%02d %02d:30:00", t.Year(), t.Month(), t.Day(), t.Hour())
duetimecst, err := time.ParseInLocation("2006-1-2 15:04:05", timeStr, loc)
if err != nil {
logger.Error("parse error : %v", err)
}
return duetimecst.Unix()
}
// 获取整分时间戳
func GetMinuteStartTimeStamp(t time.Time) int64 {
loc, _ := time.LoadLocation("Asia/Shanghai")