by Robin at 20240308; add bottom flag

This commit is contained in:
Leufolium 2024-03-08 15:15:09 +08:00
parent 5a965d4f75
commit bed1313e84
5 changed files with 170 additions and 78 deletions

View File

@ -13,4 +13,5 @@ const (
const (
Recomm_Down = 0
Recomm_Up = 1
Recomm_Init = 2
)

View File

@ -3318,10 +3318,14 @@ func (m *Mongo) UpdateUserVisitOffset(ctx *gin.Context, uservisitoffset *dbstruc
"_id": uservisitoffset.Id,
"ver": uservisitoffset.Ver,
}
set := qmgo.M{
"streamer_recomm_offset": uservisitoffset.StreamerRecommOffset,
}
if uservisitoffset.BottomFlag != nil {
set["bottom_flag"] = uservisitoffset.GetBottomFlag()
}
update := qmgo.M{
"$set": qmgo.M{
"streamer_recomm_offset": uservisitoffset.StreamerRecommOffset,
},
"$set": set,
}
err := col.UpdateOne(ctx, filter, update)
return err

View File

@ -1244,20 +1244,11 @@ func (s *Service) ApiGetStreamerRecommList(ctx *gin.Context, req *streamerproto.
}
}
if req.OpType == consts.Recomm_Up {
recommStreamerList, err = s.utilGetUpStreamerRecommList(ctx, recommlist, req.Mid)
if err != nil {
logger.Error("utilGetUpStreamerRecommList fail, err: %v", err)
ec = errcode.ErrCodeStreamerSrvFail
return
}
} else {
recommStreamerList, err = s.utilGetUpStreamerRecommList(ctx, recommlist, req.Mid)
if err != nil {
logger.Error("utilGetUpStreamerRecommList fail, err: %v", err)
ec = errcode.ErrCodeStreamerSrvFail
return
}
recommStreamerList, err = s.utilGetStreamerRecommListVO(ctx, recommlist, req.Mid, req.OpType)
if err != nil {
logger.Error("utilGetUpStreamerRecommList fail, err: %v", err)
ec = errcode.ErrCodeStreamerSrvFail
return
}
return

View File

@ -2,7 +2,6 @@ package service
import (
"fmt"
"math"
"service/api/consts"
"service/api/errcode"
"service/api/interfaces"
@ -27,6 +26,7 @@ import (
"time"
"github.com/gin-gonic/gin"
"github.com/qiniu/qmgo"
"go.mongodb.org/mongo-driver/mongo"
goproto "google.golang.org/protobuf/proto"
)
@ -666,18 +666,27 @@ func (s *Service) utilGetStreamerRecommList(ctx *gin.Context) (recommlist []int6
return
}
func (s *Service) utilGetUserVisitOffset(ctx *gin.Context, mid int64, recommlistLength int64) (offset int64, err error) {
offset = int64(0)
func (s *Service) utilGetInitUserVisitOffset(ctx *gin.Context, mid int64, recommlistLength int64) (err error) {
uservisitoffset, err := _DefaultUserVisitOffset.OpGetUserVisitOffset(ctx, mid)
if err != nil {
logger.Error("OpGetUserVisitOffset fail, err: %v", err)
return
}
var execFunc func(*gin.Context, *dbstruct.UserVisitOffset) error
if uservisitoffset == nil {
nowoffset := (offset + consts.StreamerRecommThroughput) % recommlistLength
err = _DefaultUserVisitOffset.OpCreate(ctx, &dbstruct.UserVisitOffset{
execFunc = _DefaultUserVisitOffset.OpCreate
} else {
execFunc = _DefaultUserVisitOffset.OpUpdate
}
// 吞吐量大于等于推荐数组长度,则这次获取后已经触底
if consts.StreamerRecommThroughput >= recommlistLength {
err = execFunc(ctx, &dbstruct.UserVisitOffset{
Id: mid,
StreamerRecommOffset: nowoffset,
StreamerRecommOffset: 0,
BottomFlag: goproto.Int64(1),
Ver: 0,
})
if err != nil {
@ -685,8 +694,32 @@ func (s *Service) utilGetUserVisitOffset(ctx *gin.Context, mid int64, recommlist
return
}
} else {
offset = uservisitoffset.StreamerRecommOffset
nowoffset := (offset + consts.StreamerRecommThroughput) % recommlistLength
err = execFunc(ctx, &dbstruct.UserVisitOffset{
Id: mid,
StreamerRecommOffset: consts.StreamerRecommThroughput,
BottomFlag: goproto.Int64(0),
Ver: 0,
})
if err != nil {
logger.Error("_DefaultUserVisitOffset OpCreate fail, err: %v", err)
return
}
}
return nil
}
func (s *Service) utilGetUpUserVisitOffset(ctx *gin.Context, mid int64, recommlistLength int64) (uservisitoffset *dbstruct.UserVisitOffset, err error) {
uservisitoffset, err = _DefaultUserVisitOffset.OpGetUserVisitOffset(ctx, mid)
if err != nil {
logger.Error("OpGetUserVisitOffset fail, err: %v", err)
return
}
if uservisitoffset == nil {
// 默认用户刚进推荐页面的第一次操作一定是冷启动操作,游标也由冷启动创建
return nil, qmgo.ErrNoSuchDocuments
} else {
nowoffset := (uservisitoffset.StreamerRecommOffset + consts.StreamerRecommThroughput) % recommlistLength
err = _DefaultUserVisitOffset.OpUpdate(ctx, &dbstruct.UserVisitOffset{
Id: uservisitoffset.Id,
StreamerRecommOffset: nowoffset,
@ -700,70 +733,113 @@ func (s *Service) utilGetUserVisitOffset(ctx *gin.Context, mid int64, recommlist
return
}
func (s *Service) utilGetUpStreamerRecommList(ctx *gin.Context, recommlist []int64, mid int64) (recommStreamerList []*streamerproto.ApiListExtVO, err error) {
// 获取用户游标
offset := int64(0)
recommListLength := int64(len(recommlist))
if consts.StreamerRecommThroughput < recommListLength {
offset, err = s.utilGetUserVisitOffset(ctx, mid, recommListLength)
if err != nil {
logger.Error("utilGetUserVisitOffset fail, err: %v", err)
func (s *Service) utilGetDownUserVisitOffset(ctx *gin.Context, mid int64, recommlistLength int64) (uservisitoffset *dbstruct.UserVisitOffset, err error) {
uservisitoffset, err = _DefaultUserVisitOffset.OpGetUserVisitOffset(ctx, mid)
if err != nil {
logger.Error("OpGetUserVisitOffset fail, err: %v", err)
return
}
if uservisitoffset == nil {
// 默认用户刚进推荐页面的第一次操作一定是冷启动操作,游标也由冷启动创建
return nil, qmgo.ErrNoSuchDocuments
} else {
if uservisitoffset.GetBottomFlag() == 1 {
return
}
offset := uservisitoffset.StreamerRecommOffset
if offset+consts.StreamerRecommThroughput >= recommlistLength {
err = _DefaultUserVisitOffset.OpUpdate(ctx, &dbstruct.UserVisitOffset{
Id: uservisitoffset.Id,
StreamerRecommOffset: 0,
BottomFlag: goproto.Int64(1),
Ver: uservisitoffset.Ver,
})
if err != nil {
logger.Error("_DefaultUserVisitOffset OpUpdate fail, err: %v", err)
return
}
} else {
err = _DefaultUserVisitOffset.OpUpdate(ctx, &dbstruct.UserVisitOffset{
Id: uservisitoffset.Id,
StreamerRecommOffset: offset + consts.StreamerRecommThroughput,
Ver: uservisitoffset.Ver,
})
if err != nil {
logger.Error("_DefaultUserVisitOffset OpUpdate fail, err: %v", err)
return
}
}
}
// 根据用户游标查询得到结果
midList := make([]int64, 0)
for i := 0; i < int(math.Min(float64(consts.StreamerRecommThroughput), float64(recommListLength))); i++ {
index := (offset + int64(i)) % recommListLength
midList = append(midList, recommlist[index])
}
accountList, err := _DefaultAccount.OpListByMids(ctx, &accountproto.OpListByMidsReq{
Mids: midList,
Sort: []string{"_id"},
})
if err != nil {
logger.Error("_DefaultAccount OpListByMids fail, err: %v", err)
return
}
streamerExts := make([]streamerproto.StreamerExtVO, len(accountList))
for i := range streamerExts {
streamerExts[i] = &streamerproto.ApiListExtVO{}
}
ec := s.utilExtendAccountsIntoStreamerExts(ctx, accountList, streamerExts)
if ec != errcode.ErrCodeStreamerSrvOk {
err = fmt.Errorf("Extend accountlist into streamer_exts fail")
logger.Error("Extend accountlist into streamer_exts fail")
return
}
recommStreamerList = make([]*streamerproto.ApiListExtVO, len(streamerExts))
for i, streamerExt := range streamerExts {
vo, _ := streamerExt.(*streamerproto.ApiListExtVO)
recommStreamerList[i] = vo
}
return
}
func (s *Service) utilGetDownStreamerRecommList(ctx *gin.Context, recommlist []int64, mid int64) (recommStreamerList []*streamerproto.ApiListExtVO, err error) {
// 获取用户游标
offset := int64(0)
recommListLength := int64(len(recommlist))
if consts.StreamerRecommThroughput < recommListLength {
offset, err = s.utilGetUserVisitOffset(ctx, mid, recommListLength)
// 获取该用户当前在主播推荐数组中的游标和是否已经触底标志
func (s *Service) utilGetStreamerRecommlistOffsetAndBottomeFlag(ctx *gin.Context, recommListLength int64, mid int64, opType int64) (offset int64, bottomFlag int64, err error) {
offset = int64(0)
var uservisitoffset *dbstruct.UserVisitOffset
// 初始化操作
switch opType {
case consts.Recomm_Init:
err = s.utilGetInitUserVisitOffset(ctx, mid, int64(recommListLength))
if err != nil {
logger.Error("utilGetUserVisitOffset fail, err: %v", err)
logger.Error("utilGetInitUserVisitOffset fail, err: %v", err)
return
}
// 向上滚动操作
case consts.Recomm_Up:
// 若吞吐量比推荐数组长度小则正常操作否则认为游标永远是0
if consts.StreamerRecommThroughput < recommListLength {
uservisitoffset, err = s.utilGetUpUserVisitOffset(ctx, mid, int64(recommListLength))
if err != nil {
logger.Error("utilGetUpUserVisitOffset fail, err: %v", err)
return
}
offset = uservisitoffset.StreamerRecommOffset
}
// 向下滚动操作
case consts.Recomm_Down:
// 若吞吐量比推荐数组长度小,则正常操作,否则认为直接触底
if consts.StreamerRecommThroughput < recommListLength {
uservisitoffset, err = s.utilGetDownUserVisitOffset(ctx, mid, int64(recommListLength))
if err != nil {
logger.Error("utilGetUpUserVisitOffset fail, err: %v", err)
return
}
if uservisitoffset.GetBottomFlag() == 1 {
return 0, 1, nil
}
offset = uservisitoffset.StreamerRecommOffset
} else {
return 0, 1, nil
}
}
return
}
func (s *Service) utilGetStreamerRecommListVO(ctx *gin.Context, recommlist []int64, mid int64, opType int64) (recommStreamerList []*streamerproto.ApiListExtVO, err error) {
// 获取用户游标
offset := int64(0)
recommListLength := len(recommlist)
upperBound := recommListLength
offset, bottomFlag, err := s.utilGetStreamerRecommlistOffsetAndBottomeFlag(ctx, int64(recommListLength), mid, opType)
if err != nil {
logger.Error("utilGetStreamerRecommlistOffsetAndBottomeFlag fail, err: %v", err)
return
}
if bottomFlag == 1 { // 已触底
return make([]*streamerproto.ApiListExtVO, 0), nil
}
// 根据用户游标查询得到结果
midList := make([]int64, 0)
for i := 0; i < int(math.Min(float64(consts.StreamerRecommThroughput), float64(recommListLength))); i++ {
index := (offset + int64(i)) % recommListLength
for i := 0; i < upperBound; i++ {
index := (offset + int64(i)) % int64(recommListLength)
midList = append(midList, recommlist[index])
}
accountList, err := _DefaultAccount.OpListByMids(ctx, &accountproto.OpListByMidsReq{
Mids: midList,
Sort: []string{"_id"},
@ -773,6 +849,17 @@ func (s *Service) utilGetDownStreamerRecommList(ctx *gin.Context, recommlist []i
return
}
// 构建一个accountList -> midList 索引的map
_accountIndexMap := make(map[int]int)
for i, account := range accountList {
mid1 := util.DerefInt64(account.Mid)
for j, mid2 := range midList {
if mid1 == mid2 {
_accountIndexMap[i] = j
}
}
}
streamerExts := make([]streamerproto.StreamerExtVO, len(accountList))
for i := range streamerExts {
streamerExts[i] = &streamerproto.ApiListExtVO{}
@ -786,7 +873,8 @@ func (s *Service) utilGetDownStreamerRecommList(ctx *gin.Context, recommlist []i
recommStreamerList = make([]*streamerproto.ApiListExtVO, len(streamerExts))
for i, streamerExt := range streamerExts {
vo, _ := streamerExt.(*streamerproto.ApiListExtVO)
recommStreamerList[i] = vo
// 从下标位置映射回去
recommStreamerList[_accountIndexMap[i]] = vo
}
return

View File

@ -1,7 +1,15 @@
package dbstruct
type UserVisitOffset struct {
Id int64 `json:"id" bson:"_id"` //id,用户的mid
StreamerRecommOffset int64 `json:"streamer_recomm_offset" bson:"streamer_recomm_offset"` //主播推荐列表偏移量
Ver int64 `json:"ver" bson:"ver"` //乐观锁
Id int64 `json:"id" bson:"_id"` // id,用户的mid
StreamerRecommOffset int64 `json:"streamer_recomm_offset" bson:"streamer_recomm_offset"` // 主播推荐列表偏移量
BottomFlag *int64 `json:"bottom_flag" bson:"bottom_flag"` // 是否触底标志
Ver int64 `json:"ver" bson:"ver"` // 乐观锁
}
func (p *UserVisitOffset) GetBottomFlag() int64 {
if p == nil || p.BottomFlag == nil {
return 0
}
return *p.BottomFlag
}