|
|
|
@ -2,7 +2,6 @@ package service
|
|
|
|
|
|
|
|
|
|
import (
|
|
|
|
|
"fmt"
|
|
|
|
|
"math"
|
|
|
|
|
"service/api/consts"
|
|
|
|
|
"service/api/errcode"
|
|
|
|
|
"service/api/interfaces"
|
|
|
|
@ -767,18 +766,27 @@ func (s *Service) utilGetStreamerRecommList(ctx *gin.Context) (recommlist []int6
|
|
|
|
|
return
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (s *Service) utilGetUserVisitOffset(ctx *gin.Context, mid int64, recommlistLength int64) (offset int64, err error) {
|
|
|
|
|
offset = int64(0)
|
|
|
|
|
func (s *Service) utilGetInitUserVisitOffset(ctx *gin.Context, mid int64, recommlistLength int64) (err error) {
|
|
|
|
|
uservisitoffset, err := _DefaultUserVisitOffset.OpGetUserVisitOffset(ctx, mid)
|
|
|
|
|
if err != nil {
|
|
|
|
|
logger.Error("OpGetUserVisitOffset fail, err: %v", err)
|
|
|
|
|
return
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
var execFunc func(*gin.Context, *dbstruct.UserVisitOffset) error
|
|
|
|
|
|
|
|
|
|
if uservisitoffset == nil {
|
|
|
|
|
nowoffset := (offset + consts.StreamerRecommThroughput) % recommlistLength
|
|
|
|
|
err = _DefaultUserVisitOffset.OpCreate(ctx, &dbstruct.UserVisitOffset{
|
|
|
|
|
execFunc = _DefaultUserVisitOffset.OpCreate
|
|
|
|
|
} else {
|
|
|
|
|
execFunc = _DefaultUserVisitOffset.OpUpdate
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// 吞吐量大于等于推荐数组长度,则这次获取后已经触底
|
|
|
|
|
if consts.StreamerRecommThroughput >= recommlistLength {
|
|
|
|
|
err = execFunc(ctx, &dbstruct.UserVisitOffset{
|
|
|
|
|
Id: mid,
|
|
|
|
|
StreamerRecommOffset: nowoffset,
|
|
|
|
|
StreamerRecommOffset: 0,
|
|
|
|
|
BottomFlag: goproto.Int64(1),
|
|
|
|
|
Ver: 0,
|
|
|
|
|
})
|
|
|
|
|
if err != nil {
|
|
|
|
@ -786,8 +794,32 @@ func (s *Service) utilGetUserVisitOffset(ctx *gin.Context, mid int64, recommlist
|
|
|
|
|
return
|
|
|
|
|
}
|
|
|
|
|
} else {
|
|
|
|
|
offset = uservisitoffset.StreamerRecommOffset
|
|
|
|
|
nowoffset := (offset + consts.StreamerRecommThroughput) % recommlistLength
|
|
|
|
|
err = execFunc(ctx, &dbstruct.UserVisitOffset{
|
|
|
|
|
Id: mid,
|
|
|
|
|
StreamerRecommOffset: consts.StreamerRecommThroughput,
|
|
|
|
|
BottomFlag: goproto.Int64(0),
|
|
|
|
|
Ver: 0,
|
|
|
|
|
})
|
|
|
|
|
if err != nil {
|
|
|
|
|
logger.Error("_DefaultUserVisitOffset OpCreate fail, err: %v", err)
|
|
|
|
|
return
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
return nil
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (s *Service) utilGetUpUserVisitOffset(ctx *gin.Context, mid int64, recommlistLength int64) (uservisitoffset *dbstruct.UserVisitOffset, err error) {
|
|
|
|
|
uservisitoffset, err = _DefaultUserVisitOffset.OpGetUserVisitOffset(ctx, mid)
|
|
|
|
|
if err != nil {
|
|
|
|
|
logger.Error("OpGetUserVisitOffset fail, err: %v", err)
|
|
|
|
|
return
|
|
|
|
|
}
|
|
|
|
|
if uservisitoffset == nil {
|
|
|
|
|
// 默认用户刚进推荐页面的第一次操作一定是冷启动操作,游标也由冷启动创建
|
|
|
|
|
return nil, qmgo.ErrNoSuchDocuments
|
|
|
|
|
} else {
|
|
|
|
|
nowoffset := (uservisitoffset.StreamerRecommOffset + consts.StreamerRecommThroughput) % recommlistLength
|
|
|
|
|
err = _DefaultUserVisitOffset.OpUpdate(ctx, &dbstruct.UserVisitOffset{
|
|
|
|
|
Id: uservisitoffset.Id,
|
|
|
|
|
StreamerRecommOffset: nowoffset,
|
|
|
|
@ -801,70 +833,113 @@ func (s *Service) utilGetUserVisitOffset(ctx *gin.Context, mid int64, recommlist
|
|
|
|
|
return
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (s *Service) utilGetUpStreamerRecommList(ctx *gin.Context, recommlist []int64, mid int64) (recommStreamerList []*streamerproto.ApiListExtVO, err error) {
|
|
|
|
|
// 获取用户游标
|
|
|
|
|
offset := int64(0)
|
|
|
|
|
recommListLength := int64(len(recommlist))
|
|
|
|
|
if consts.StreamerRecommThroughput < recommListLength {
|
|
|
|
|
offset, err = s.utilGetUserVisitOffset(ctx, mid, recommListLength)
|
|
|
|
|
if err != nil {
|
|
|
|
|
logger.Error("utilGetUserVisitOffset fail, err: %v", err)
|
|
|
|
|
func (s *Service) utilGetDownUserVisitOffset(ctx *gin.Context, mid int64, recommlistLength int64) (uservisitoffset *dbstruct.UserVisitOffset, err error) {
|
|
|
|
|
uservisitoffset, err = _DefaultUserVisitOffset.OpGetUserVisitOffset(ctx, mid)
|
|
|
|
|
if err != nil {
|
|
|
|
|
logger.Error("OpGetUserVisitOffset fail, err: %v", err)
|
|
|
|
|
return
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if uservisitoffset == nil {
|
|
|
|
|
// 默认用户刚进推荐页面的第一次操作一定是冷启动操作,游标也由冷启动创建
|
|
|
|
|
return nil, qmgo.ErrNoSuchDocuments
|
|
|
|
|
} else {
|
|
|
|
|
if uservisitoffset.GetBottomFlag() == 1 {
|
|
|
|
|
return
|
|
|
|
|
}
|
|
|
|
|
offset := uservisitoffset.StreamerRecommOffset
|
|
|
|
|
if offset+consts.StreamerRecommThroughput >= recommlistLength {
|
|
|
|
|
err = _DefaultUserVisitOffset.OpUpdate(ctx, &dbstruct.UserVisitOffset{
|
|
|
|
|
Id: uservisitoffset.Id,
|
|
|
|
|
StreamerRecommOffset: 0,
|
|
|
|
|
BottomFlag: goproto.Int64(1),
|
|
|
|
|
Ver: uservisitoffset.Ver,
|
|
|
|
|
})
|
|
|
|
|
if err != nil {
|
|
|
|
|
logger.Error("_DefaultUserVisitOffset OpUpdate fail, err: %v", err)
|
|
|
|
|
return
|
|
|
|
|
}
|
|
|
|
|
} else {
|
|
|
|
|
err = _DefaultUserVisitOffset.OpUpdate(ctx, &dbstruct.UserVisitOffset{
|
|
|
|
|
Id: uservisitoffset.Id,
|
|
|
|
|
StreamerRecommOffset: offset + consts.StreamerRecommThroughput,
|
|
|
|
|
Ver: uservisitoffset.Ver,
|
|
|
|
|
})
|
|
|
|
|
if err != nil {
|
|
|
|
|
logger.Error("_DefaultUserVisitOffset OpUpdate fail, err: %v", err)
|
|
|
|
|
return
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// 根据用户游标查询得到结果
|
|
|
|
|
midList := make([]int64, 0)
|
|
|
|
|
for i := 0; i < int(math.Min(float64(consts.StreamerRecommThroughput), float64(recommListLength))); i++ {
|
|
|
|
|
index := (offset + int64(i)) % recommListLength
|
|
|
|
|
midList = append(midList, recommlist[index])
|
|
|
|
|
}
|
|
|
|
|
accountList, err := _DefaultAccount.OpListByMids(ctx, &accountproto.OpListByMidsReq{
|
|
|
|
|
Mids: midList,
|
|
|
|
|
Sort: []string{"_id"},
|
|
|
|
|
})
|
|
|
|
|
if err != nil {
|
|
|
|
|
logger.Error("_DefaultAccount OpListByMids fail, err: %v", err)
|
|
|
|
|
return
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
streamerExts := make([]streamerproto.StreamerExtVO, len(accountList))
|
|
|
|
|
for i := range streamerExts {
|
|
|
|
|
streamerExts[i] = &streamerproto.ApiListExtVO{}
|
|
|
|
|
}
|
|
|
|
|
ec := s.utilExtendAccountsIntoStreamerExts(ctx, accountList, streamerExts)
|
|
|
|
|
if ec != errcode.ErrCodeStreamerSrvOk {
|
|
|
|
|
err = fmt.Errorf("Extend accountlist into streamer_exts fail")
|
|
|
|
|
logger.Error("Extend accountlist into streamer_exts fail")
|
|
|
|
|
return
|
|
|
|
|
}
|
|
|
|
|
recommStreamerList = make([]*streamerproto.ApiListExtVO, len(streamerExts))
|
|
|
|
|
for i, streamerExt := range streamerExts {
|
|
|
|
|
vo, _ := streamerExt.(*streamerproto.ApiListExtVO)
|
|
|
|
|
recommStreamerList[i] = vo
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
return
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (s *Service) utilGetDownStreamerRecommList(ctx *gin.Context, recommlist []int64, mid int64) (recommStreamerList []*streamerproto.ApiListExtVO, err error) {
|
|
|
|
|
// 获取用户游标
|
|
|
|
|
offset := int64(0)
|
|
|
|
|
recommListLength := int64(len(recommlist))
|
|
|
|
|
if consts.StreamerRecommThroughput < recommListLength {
|
|
|
|
|
offset, err = s.utilGetUserVisitOffset(ctx, mid, recommListLength)
|
|
|
|
|
// 获取该用户当前在主播推荐数组中的游标和是否已经触底标志
|
|
|
|
|
func (s *Service) utilGetStreamerRecommlistOffsetAndBottomeFlag(ctx *gin.Context, recommListLength int64, mid int64, opType int64) (offset int64, bottomFlag int64, err error) {
|
|
|
|
|
offset = int64(0)
|
|
|
|
|
var uservisitoffset *dbstruct.UserVisitOffset
|
|
|
|
|
|
|
|
|
|
// 初始化操作
|
|
|
|
|
switch opType {
|
|
|
|
|
case consts.Recomm_Init:
|
|
|
|
|
err = s.utilGetInitUserVisitOffset(ctx, mid, int64(recommListLength))
|
|
|
|
|
if err != nil {
|
|
|
|
|
logger.Error("utilGetUserVisitOffset fail, err: %v", err)
|
|
|
|
|
logger.Error("utilGetInitUserVisitOffset fail, err: %v", err)
|
|
|
|
|
return
|
|
|
|
|
}
|
|
|
|
|
// 向上滚动操作
|
|
|
|
|
case consts.Recomm_Up:
|
|
|
|
|
// 若吞吐量比推荐数组长度小,则正常操作,否则认为游标永远是0
|
|
|
|
|
if consts.StreamerRecommThroughput < recommListLength {
|
|
|
|
|
uservisitoffset, err = s.utilGetUpUserVisitOffset(ctx, mid, int64(recommListLength))
|
|
|
|
|
if err != nil {
|
|
|
|
|
logger.Error("utilGetUpUserVisitOffset fail, err: %v", err)
|
|
|
|
|
return
|
|
|
|
|
}
|
|
|
|
|
offset = uservisitoffset.StreamerRecommOffset
|
|
|
|
|
}
|
|
|
|
|
// 向下滚动操作
|
|
|
|
|
case consts.Recomm_Down:
|
|
|
|
|
// 若吞吐量比推荐数组长度小,则正常操作,否则认为直接触底
|
|
|
|
|
if consts.StreamerRecommThroughput < recommListLength {
|
|
|
|
|
uservisitoffset, err = s.utilGetDownUserVisitOffset(ctx, mid, int64(recommListLength))
|
|
|
|
|
if err != nil {
|
|
|
|
|
logger.Error("utilGetUpUserVisitOffset fail, err: %v", err)
|
|
|
|
|
return
|
|
|
|
|
}
|
|
|
|
|
if uservisitoffset.GetBottomFlag() == 1 {
|
|
|
|
|
return 0, 1, nil
|
|
|
|
|
}
|
|
|
|
|
offset = uservisitoffset.StreamerRecommOffset
|
|
|
|
|
} else {
|
|
|
|
|
return 0, 1, nil
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
return
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (s *Service) utilGetStreamerRecommListVO(ctx *gin.Context, recommlist []int64, mid int64, opType int64) (recommStreamerList []*streamerproto.ApiListExtVO, err error) {
|
|
|
|
|
// 获取用户游标
|
|
|
|
|
offset := int64(0)
|
|
|
|
|
recommListLength := len(recommlist)
|
|
|
|
|
upperBound := recommListLength
|
|
|
|
|
|
|
|
|
|
offset, bottomFlag, err := s.utilGetStreamerRecommlistOffsetAndBottomeFlag(ctx, int64(recommListLength), mid, opType)
|
|
|
|
|
if err != nil {
|
|
|
|
|
logger.Error("utilGetStreamerRecommlistOffsetAndBottomeFlag fail, err: %v", err)
|
|
|
|
|
return
|
|
|
|
|
}
|
|
|
|
|
if bottomFlag == 1 { // 已触底
|
|
|
|
|
return make([]*streamerproto.ApiListExtVO, 0), nil
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// 根据用户游标查询得到结果
|
|
|
|
|
midList := make([]int64, 0)
|
|
|
|
|
for i := 0; i < int(math.Min(float64(consts.StreamerRecommThroughput), float64(recommListLength))); i++ {
|
|
|
|
|
index := (offset + int64(i)) % recommListLength
|
|
|
|
|
for i := 0; i < upperBound; i++ {
|
|
|
|
|
index := (offset + int64(i)) % int64(recommListLength)
|
|
|
|
|
midList = append(midList, recommlist[index])
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
accountList, err := _DefaultAccount.OpListByMids(ctx, &accountproto.OpListByMidsReq{
|
|
|
|
|
Mids: midList,
|
|
|
|
|
Sort: []string{"_id"},
|
|
|
|
@ -874,6 +949,17 @@ func (s *Service) utilGetDownStreamerRecommList(ctx *gin.Context, recommlist []i
|
|
|
|
|
return
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// 构建一个accountList -> midList 索引的map
|
|
|
|
|
_accountIndexMap := make(map[int]int)
|
|
|
|
|
for i, account := range accountList {
|
|
|
|
|
mid1 := util.DerefInt64(account.Mid)
|
|
|
|
|
for j, mid2 := range midList {
|
|
|
|
|
if mid1 == mid2 {
|
|
|
|
|
_accountIndexMap[i] = j
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
streamerExts := make([]streamerproto.StreamerExtVO, len(accountList))
|
|
|
|
|
for i := range streamerExts {
|
|
|
|
|
streamerExts[i] = &streamerproto.ApiListExtVO{}
|
|
|
|
@ -887,7 +973,8 @@ func (s *Service) utilGetDownStreamerRecommList(ctx *gin.Context, recommlist []i
|
|
|
|
|
recommStreamerList = make([]*streamerproto.ApiListExtVO, len(streamerExts))
|
|
|
|
|
for i, streamerExt := range streamerExts {
|
|
|
|
|
vo, _ := streamerExt.(*streamerproto.ApiListExtVO)
|
|
|
|
|
recommStreamerList[i] = vo
|
|
|
|
|
// 从下标位置映射回去
|
|
|
|
|
recommStreamerList[_accountIndexMap[i]] = vo
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
return
|
|
|
|
|