service/app/mix/service/streamer_recomm_service.go

442 lines
12 KiB
Go

package service
import (
"fmt"
"math"
"service/api/consts"
"service/apollostruct"
"service/app/mix/service/logic"
"service/dbstruct"
"service/library/apollo"
"service/library/logger"
"sort"
streamerproto "service/api/proto/streamer/proto"
"github.com/gin-gonic/gin"
)
var (
DefaultStreamerRecommService *StreamerRecommService
)
type StreamerRecommService struct {
momentService *logic.Moment
zoneMomentService *logic.ZoneMoment
vasService *logic.Vas
streamerService *logic.Streamer
streamerScoreService *logic.StreamerScore
accountpunishmentService *logic.AccountPunishment
out func([]int64) error
formula *apollostruct.StreamerScoreFormulaCfg
scores map[int64]*StreamerScore
scorelist ByScore
}
func NewStreamerRecommService() *StreamerRecommService {
return new(StreamerRecommService)
}
func (s *StreamerRecommService) SetStreamerRecommDbService(moment *logic.Moment, zonemoment *logic.ZoneMoment, vas *logic.Vas, streamer *logic.Streamer, streamerScore *logic.StreamerScore, accountpunishment *logic.AccountPunishment) {
s.momentService = moment
s.zoneMomentService = zonemoment
s.vasService = vas
s.streamerService = streamer
s.streamerScoreService = streamerScore
s.accountpunishmentService = accountpunishment
}
func (s *StreamerRecommService) SetOut(out func([]int64) error) {
s.out = out
}
// 启动跑批
func (s *StreamerRecommService) RunBatch(nt int64) error {
ctx := &gin.Context{}
err := s.init()
if err != nil {
logger.Error("init fail, err: %v", err)
s.clear()
return err
}
// 取数
err = s.recvMsg(ctx, nt)
if err != nil {
logger.Error("recvMsg fail, err: %v", err)
s.clear()
return err
}
// 打分
err = s.score()
if err != nil {
logger.Error("score fail, err: %v", err)
s.clear()
return err
}
// 排序
s.sort()
// 推数
err = s.push()
if err != nil {
logger.Error("push fail, err: %v", err)
s.clear()
return err
}
// 清空
s.clear()
return nil
}
func (s *StreamerRecommService) init() error {
s.clear()
// 获取计算公式
s.formula = &apollostruct.StreamerScoreFormulaCfg{}
err := apollo.GetJson(consts.StreamerScoreFormulaKey, s.formula, apollo.ApolloOpts().SetNamespace("application"))
if err != nil {
logger.Error("Apollo read failed : %v", err)
s.clear()
return fmt.Errorf("apollo read failed : %v", err)
}
// 获取优先级
s.formula.CalPriority()
// 初始化主播的数据
list, err := s.streamerService.OpList(&gin.Context{}, &streamerproto.OpListReq{})
if err != nil {
logger.Error("Streamer OpList err: %v", err)
return err
}
for _, streamer := range list {
score := &StreamerScore{
Mid: streamer.GetMid(),
}
s.scores[streamer.GetMid()] = score
s.scorelist = append(s.scorelist, score)
}
return nil
}
func (s *StreamerRecommService) clear() {
s.scorelist = make([]*StreamerScore, 0)
s.scores = make(map[int64]*StreamerScore)
}
// 取数
func (s *StreamerRecommService) recvMsg(ctx *gin.Context, nt int64) (err error) {
beforeThreeDays := nt - int64(259200)
beforeAWeek := nt - int64(604800)
beforeAMonth := nt - int64(2592000)
// 三日内空间动态总数
err = s.recvZoneMomentStatInfoInThreeDays(ctx, beforeThreeDays, nt)
if err != nil {
logger.Error("recvZoneMomentStatInfoInThreeDays fail, err: %v", err)
return
}
// 三十日内空间动态总数
err = s.recvZoneMomentStatInfoInAMonth(ctx, beforeAMonth, nt)
if err != nil {
logger.Error("recvZoneMomentStatInfoInAMonth fail, err: %v", err)
return
}
// 七日内消费金额
err = s.recvStreamerProfitInAWeek(ctx, beforeAWeek, nt)
if err != nil {
logger.Error("recvStreamerProfitInAWeek fail, err: %v", err)
return
}
// 三日内新增成员数
err = s.recvZoneAdmissionInfoInThreeDays(ctx, beforeThreeDays, nt)
if err != nil {
logger.Error("recvZoneAdmissionInfoInThreeDays fail, err: %v", err)
return
}
// 三十日内新增成员数
err = s.recvZoneAdmissionInfoInAMonth(ctx, beforeAMonth, nt)
if err != nil {
logger.Error("recvZoneAdmissionInfoInAMonth fail, err: %v", err)
return
}
// 三日内新增广场动态总数
err = s.recvMomentStatInfoInThreeDays(ctx, beforeThreeDays, nt)
if err != nil {
logger.Error("recvMomentStatInfoInThreeDays fail, err: %v", err)
return
}
// 退款率
err = s.recvRefundRate(ctx)
if err != nil {
logger.Error("recvRefundRate fail, err: %v", err)
return
}
return
}
// 打分
func (s *StreamerRecommService) score() error {
for _, v := range s.scores {
v.CalScore(s.formula)
v.SetPriority(s.formula.Priority)
}
return nil
}
// 排序
func (s *StreamerRecommService) sort() {
sort.Sort(s.scorelist)
}
// 转存
func (s *StreamerRecommService) save() {
}
// 推送
func (s *StreamerRecommService) push() error {
// 查找禁止在推荐被发现的主播,不将其加入推荐列
acctpunishments, err := _DefaultAccountPunishment.OpListByType(&gin.Context{}, consts.AccountPunishment_BlockFromBeingDiscovered)
if err != nil {
logger.Error("_DefaultAccountPunishment OpListByType fail, err: %v", err)
}
blockedMp := make(map[int64]*dbstruct.AccountPunishment)
for _, acctpunishment := range acctpunishments {
blockedMp[acctpunishment.GetMid()] = &dbstruct.AccountPunishment{}
}
list := make([]int64, 0)
l := make([]*dbstruct.StreamerScore, 0)
for i := len(s.scorelist) - 1; i >= 0; i-- {
v := s.scorelist[i]
if _, ok := blockedMp[v.Mid]; ok {
continue
}
list = append(list, v.Mid)
l = append(l, &dbstruct.StreamerScore{
Id: v.Mid,
Mid: v.Mid,
ZoneMomentCountInThreeDays: v.ZoneMomentCountInThreeDays,
ZoneMomentCountInAMonth: v.ZoneMomentCountInAMonth,
IncomeInAWeek: v.IncomeInAWeek,
NewZoneMemberCountInThreeDays: v.NewZoneMemberCountInThreeDays,
NewZoneMemberCountInAMonth: v.NewZoneMemberCountInAMonth,
MomentCountInThreeDays: v.MomentCountInThreeDays,
RefundRate: v.RefundRate,
Score: v.Score,
})
}
err = _DefaultStreamerScore.OpSetStreamerScore(&gin.Context{}, l)
if err != nil {
logger.Error("OpSetStreamerScore fail, err: %v", err)
}
return s.out(list)
}
// 取数
func (s *StreamerRecommService) recvZoneMomentStatInfoInThreeDays(ctx *gin.Context, st, et int64) (err error) {
zoneMomentStatInfoList, err := s.zoneMomentService.GetZoneMomentStatInfoList(ctx, st, et)
if err != nil {
logger.Error("GetZoneMomentStatInfoList in three days fail, err: %v", err)
return
}
for _, zmsi := range zoneMomentStatInfoList {
if _, ok := s.scores[zmsi.GetMid()]; ok {
s.scores[zmsi.GetMid()].ZoneMomentCountInThreeDays = zmsi.GetCount()
}
}
return
}
func (s *StreamerRecommService) recvZoneMomentStatInfoInAMonth(ctx *gin.Context, st, et int64) (err error) {
zoneMomentStatInfoList, err := s.zoneMomentService.GetZoneMomentStatInfoList(ctx, st, et)
if err != nil {
logger.Error("GetZoneMomentStatInfoList in three days fail, err: %v", err)
return
}
for _, zmsi := range zoneMomentStatInfoList {
if _, ok := s.scores[zmsi.GetMid()]; ok {
s.scores[zmsi.GetMid()].ZoneMomentCountInAMonth = zmsi.GetCount()
}
}
return
}
func (s *StreamerRecommService) recvStreamerProfitInAWeek(ctx *gin.Context, st, et int64) (err error) {
streamerProfitList, err := s.vasService.GetIncomeByTimeSpanGroupByMid(ctx, nil, st, et)
if err != nil {
logger.Error("GetIncomeByTimeSpanGroupByMid fail, err: %v", err)
return
}
for _, sp := range streamerProfitList {
if _, ok := s.scores[sp.GetMid()]; ok {
s.scores[sp.GetMid()].IncomeInAWeek = sp.GetAmount()
}
}
return
}
func (s *StreamerRecommService) recvZoneAdmissionInfoInThreeDays(ctx *gin.Context, st, et int64) (err error) {
zoneAdmissionInfoList, err := s.vasService.GetLastHourZoneAdmissionInfo(ctx, nil, st, et)
if err != nil {
logger.Error("GetLastHourZoneAdmissionInfo in three days fail, err: %v", err)
return
}
for _, zai := range zoneAdmissionInfoList {
if _, ok := s.scores[zai.GetMid()]; ok {
s.scores[zai.GetMid()].NewZoneMemberCountInThreeDays = zai.GetNum()
}
}
return
}
func (s *StreamerRecommService) recvZoneAdmissionInfoInAMonth(ctx *gin.Context, st, et int64) (err error) {
zoneAdmissionInfoList, err := s.vasService.GetLastHourZoneAdmissionInfo(ctx, nil, st, et)
if err != nil {
logger.Error("GetLastHourZoneAdmissionInfo in three days fail, err: %v", err)
return
}
for _, zai := range zoneAdmissionInfoList {
if _, ok := s.scores[zai.GetMid()]; ok {
s.scores[zai.GetMid()].NewZoneMemberCountInAMonth = zai.GetNum()
}
}
return
}
func (s *StreamerRecommService) recvMomentStatInfoInThreeDays(ctx *gin.Context, st, et int64) (err error) {
momentStatInfoList, err := s.momentService.GetMomentStatInfoList(ctx, st, et)
if err != nil {
logger.Error("GetMomentStatInfoList in three days fail, err: %v", err)
return
}
for _, msi := range momentStatInfoList {
if _, ok := s.scores[msi.GetMid()]; ok {
s.scores[msi.GetMid()].MomentCountInThreeDays = msi.GetCount()
}
}
return
}
func (s *StreamerRecommService) recvRefundRate(ctx *gin.Context) (err error) {
refundRateList, err := s.vasService.GetRefundRateGroupByMid(ctx, nil)
if err != nil {
logger.Error("GetRefundRateGroupByMid in three days fail, err: %v", err)
return
}
for _, rr := range refundRateList {
if _, ok := s.scores[rr.GetMid()]; ok {
s.scores[rr.GetMid()].RefundRate = rr.GetRefundRate()
}
}
return
}
/*
*@author Robin
*@class 主播打分
*/
type StreamerScore struct {
Mid int64
ZoneMomentCountInThreeDays int64
ZoneMomentCountInAMonth int64
IncomeInAWeek int64
NewZoneMemberCountInThreeDays int64
NewZoneMemberCountInAMonth int64
MomentCountInThreeDays int64
RefundRate float64
Score float64
Priorities []float64
}
func (s *StreamerScore) SetPriority(names []string) {
s.Priorities = make([]float64, 0)
s.Priorities = append(s.Priorities, s.Score)
for _, name := range names {
switch name {
case "zone_moment_count_in_three_days":
s.Priorities = append(s.Priorities, float64(s.ZoneMomentCountInThreeDays))
case "zone_moment_count_in_a_month":
s.Priorities = append(s.Priorities, float64(s.ZoneMomentCountInAMonth))
case "income_in_a_week":
s.Priorities = append(s.Priorities, float64(s.IncomeInAWeek))
case "new_zone_member_count_in_three_days":
s.Priorities = append(s.Priorities, float64(s.NewZoneMemberCountInThreeDays))
case "new_zone_member_count_in_a_month":
s.Priorities = append(s.Priorities, float64(s.NewZoneMemberCountInAMonth))
case "moment_count_in_three_days":
s.Priorities = append(s.Priorities, float64(s.MomentCountInThreeDays))
}
}
}
func (s *StreamerScore) CalScore(formula *apollostruct.StreamerScoreFormulaCfg) {
s.Score += math.Min(100, float64(s.ZoneMomentCountInThreeDays)/formula.ZoneMomentCountInThreeDays.Maximum*100) * formula.ZoneMomentCountInThreeDays.Proportion
s.Score += math.Min(100, float64(s.ZoneMomentCountInAMonth)/formula.ZoneMomentCountInAMonth.Maximum*100) * formula.ZoneMomentCountInAMonth.Proportion
s.Score += math.Min(100, float64(s.IncomeInAWeek)/formula.IncomeInAWeek.Maximum*100) * formula.IncomeInAWeek.Proportion
s.Score += math.Min(100, float64(s.NewZoneMemberCountInThreeDays)/formula.NewZoneMemberCountInThreeDays.Maximum*100) * formula.NewZoneMemberCountInThreeDays.Proportion
s.Score += math.Min(100, float64(s.NewZoneMemberCountInAMonth)/formula.NewZoneMemberCountInAMonth.Maximum*100) * formula.NewZoneMemberCountInAMonth.Proportion
s.Score += math.Min(100, float64(s.MomentCountInThreeDays)/formula.MomentCountInThreeDays.Maximum*100) * formula.MomentCountInThreeDays.Proportion
}
type ByScore []*StreamerScore
func (b ByScore) Len() int {
return len(b)
}
func (b ByScore) Less(i, j int) bool {
range1 := len(b[i].Priorities)
range2 := len(b[j].Priorities)
minRange := range1
if range1 > range2 {
minRange = range2
}
for k := 0; k < minRange; k++ {
if b[i].Priorities[k] < b[j].Priorities[k] {
return true
} else if b[i].Priorities[k] > b[j].Priorities[k] {
return false
}
}
return false
}
func (b ByScore) Swap(i, j int) {
b[i], b[j] = b[j], b[i]
}