diff --git a/api/consts/option.go b/api/consts/option.go index 542a0153..16014830 100644 --- a/api/consts/option.go +++ b/api/consts/option.go @@ -13,4 +13,5 @@ const ( const ( Recomm_Down = 0 Recomm_Up = 1 + Recomm_Init = 2 ) diff --git a/app/mix/dao/mongo.go b/app/mix/dao/mongo.go index d8080300..72b44af9 100644 --- a/app/mix/dao/mongo.go +++ b/app/mix/dao/mongo.go @@ -3318,10 +3318,14 @@ func (m *Mongo) UpdateUserVisitOffset(ctx *gin.Context, uservisitoffset *dbstruc "_id": uservisitoffset.Id, "ver": uservisitoffset.Ver, } + set := qmgo.M{ + "streamer_recomm_offset": uservisitoffset.StreamerRecommOffset, + } + if uservisitoffset.BottomFlag != nil { + set["bottom_flag"] = uservisitoffset.GetBottomFlag() + } update := qmgo.M{ - "$set": qmgo.M{ - "streamer_recomm_offset": uservisitoffset.StreamerRecommOffset, - }, + "$set": set, } err := col.UpdateOne(ctx, filter, update) return err diff --git a/app/mix/service/apiservice.go b/app/mix/service/apiservice.go index 88a9591c..c985b32e 100644 --- a/app/mix/service/apiservice.go +++ b/app/mix/service/apiservice.go @@ -1244,20 +1244,11 @@ func (s *Service) ApiGetStreamerRecommList(ctx *gin.Context, req *streamerproto. } } - if req.OpType == consts.Recomm_Up { - recommStreamerList, err = s.utilGetUpStreamerRecommList(ctx, recommlist, req.Mid) - if err != nil { - logger.Error("utilGetUpStreamerRecommList fail, err: %v", err) - ec = errcode.ErrCodeStreamerSrvFail - return - } - } else { - recommStreamerList, err = s.utilGetUpStreamerRecommList(ctx, recommlist, req.Mid) - if err != nil { - logger.Error("utilGetUpStreamerRecommList fail, err: %v", err) - ec = errcode.ErrCodeStreamerSrvFail - return - } + recommStreamerList, err = s.utilGetStreamerRecommListVO(ctx, recommlist, req.Mid, req.OpType) + if err != nil { + logger.Error("utilGetUpStreamerRecommList fail, err: %v", err) + ec = errcode.ErrCodeStreamerSrvFail + return } return diff --git a/app/mix/service/utilservice.go b/app/mix/service/utilservice.go index f81ed30b..099d01ce 100644 --- a/app/mix/service/utilservice.go +++ b/app/mix/service/utilservice.go @@ -2,7 +2,6 @@ package service import ( "fmt" - "math" "service/api/consts" "service/api/errcode" "service/api/interfaces" @@ -27,6 +26,7 @@ import ( "time" "github.com/gin-gonic/gin" + "github.com/qiniu/qmgo" "go.mongodb.org/mongo-driver/mongo" goproto "google.golang.org/protobuf/proto" ) @@ -666,18 +666,27 @@ func (s *Service) utilGetStreamerRecommList(ctx *gin.Context) (recommlist []int6 return } -func (s *Service) utilGetUserVisitOffset(ctx *gin.Context, mid int64, recommlistLength int64) (offset int64, err error) { - offset = int64(0) +func (s *Service) utilGetInitUserVisitOffset(ctx *gin.Context, mid int64, recommlistLength int64) (err error) { uservisitoffset, err := _DefaultUserVisitOffset.OpGetUserVisitOffset(ctx, mid) if err != nil { logger.Error("OpGetUserVisitOffset fail, err: %v", err) return } + + var execFunc func(*gin.Context, *dbstruct.UserVisitOffset) error + if uservisitoffset == nil { - nowoffset := (offset + consts.StreamerRecommThroughput) % recommlistLength - err = _DefaultUserVisitOffset.OpCreate(ctx, &dbstruct.UserVisitOffset{ + execFunc = _DefaultUserVisitOffset.OpCreate + } else { + execFunc = _DefaultUserVisitOffset.OpUpdate + } + + // 吞吐量大于等于推荐数组长度,则这次获取后已经触底 + if consts.StreamerRecommThroughput >= recommlistLength { + err = execFunc(ctx, &dbstruct.UserVisitOffset{ Id: mid, - StreamerRecommOffset: nowoffset, + StreamerRecommOffset: 0, + BottomFlag: goproto.Int64(1), Ver: 0, }) if err != nil { @@ -685,8 +694,32 @@ func (s *Service) utilGetUserVisitOffset(ctx *gin.Context, mid int64, recommlist return } } else { - offset = uservisitoffset.StreamerRecommOffset - nowoffset := (offset + consts.StreamerRecommThroughput) % recommlistLength + err = execFunc(ctx, &dbstruct.UserVisitOffset{ + Id: mid, + StreamerRecommOffset: consts.StreamerRecommThroughput, + BottomFlag: goproto.Int64(0), + Ver: 0, + }) + if err != nil { + logger.Error("_DefaultUserVisitOffset OpCreate fail, err: %v", err) + return + } + } + + return nil +} + +func (s *Service) utilGetUpUserVisitOffset(ctx *gin.Context, mid int64, recommlistLength int64) (uservisitoffset *dbstruct.UserVisitOffset, err error) { + uservisitoffset, err = _DefaultUserVisitOffset.OpGetUserVisitOffset(ctx, mid) + if err != nil { + logger.Error("OpGetUserVisitOffset fail, err: %v", err) + return + } + if uservisitoffset == nil { + // 默认用户刚进推荐页面的第一次操作一定是冷启动操作,游标也由冷启动创建 + return nil, qmgo.ErrNoSuchDocuments + } else { + nowoffset := (uservisitoffset.StreamerRecommOffset + consts.StreamerRecommThroughput) % recommlistLength err = _DefaultUserVisitOffset.OpUpdate(ctx, &dbstruct.UserVisitOffset{ Id: uservisitoffset.Id, StreamerRecommOffset: nowoffset, @@ -700,70 +733,113 @@ func (s *Service) utilGetUserVisitOffset(ctx *gin.Context, mid int64, recommlist return } -func (s *Service) utilGetUpStreamerRecommList(ctx *gin.Context, recommlist []int64, mid int64) (recommStreamerList []*streamerproto.ApiListExtVO, err error) { - // 获取用户游标 - offset := int64(0) - recommListLength := int64(len(recommlist)) - if consts.StreamerRecommThroughput < recommListLength { - offset, err = s.utilGetUserVisitOffset(ctx, mid, recommListLength) - if err != nil { - logger.Error("utilGetUserVisitOffset fail, err: %v", err) +func (s *Service) utilGetDownUserVisitOffset(ctx *gin.Context, mid int64, recommlistLength int64) (uservisitoffset *dbstruct.UserVisitOffset, err error) { + uservisitoffset, err = _DefaultUserVisitOffset.OpGetUserVisitOffset(ctx, mid) + if err != nil { + logger.Error("OpGetUserVisitOffset fail, err: %v", err) + return + } + + if uservisitoffset == nil { + // 默认用户刚进推荐页面的第一次操作一定是冷启动操作,游标也由冷启动创建 + return nil, qmgo.ErrNoSuchDocuments + } else { + if uservisitoffset.GetBottomFlag() == 1 { return } + offset := uservisitoffset.StreamerRecommOffset + if offset+consts.StreamerRecommThroughput >= recommlistLength { + err = _DefaultUserVisitOffset.OpUpdate(ctx, &dbstruct.UserVisitOffset{ + Id: uservisitoffset.Id, + StreamerRecommOffset: 0, + BottomFlag: goproto.Int64(1), + Ver: uservisitoffset.Ver, + }) + if err != nil { + logger.Error("_DefaultUserVisitOffset OpUpdate fail, err: %v", err) + return + } + } else { + err = _DefaultUserVisitOffset.OpUpdate(ctx, &dbstruct.UserVisitOffset{ + Id: uservisitoffset.Id, + StreamerRecommOffset: offset + consts.StreamerRecommThroughput, + Ver: uservisitoffset.Ver, + }) + if err != nil { + logger.Error("_DefaultUserVisitOffset OpUpdate fail, err: %v", err) + return + } + } } - - // 根据用户游标查询得到结果 - midList := make([]int64, 0) - for i := 0; i < int(math.Min(float64(consts.StreamerRecommThroughput), float64(recommListLength))); i++ { - index := (offset + int64(i)) % recommListLength - midList = append(midList, recommlist[index]) - } - accountList, err := _DefaultAccount.OpListByMids(ctx, &accountproto.OpListByMidsReq{ - Mids: midList, - Sort: []string{"_id"}, - }) - if err != nil { - logger.Error("_DefaultAccount OpListByMids fail, err: %v", err) - return - } - - streamerExts := make([]streamerproto.StreamerExtVO, len(accountList)) - for i := range streamerExts { - streamerExts[i] = &streamerproto.ApiListExtVO{} - } - ec := s.utilExtendAccountsIntoStreamerExts(ctx, accountList, streamerExts) - if ec != errcode.ErrCodeStreamerSrvOk { - err = fmt.Errorf("Extend accountlist into streamer_exts fail") - logger.Error("Extend accountlist into streamer_exts fail") - return - } - recommStreamerList = make([]*streamerproto.ApiListExtVO, len(streamerExts)) - for i, streamerExt := range streamerExts { - vo, _ := streamerExt.(*streamerproto.ApiListExtVO) - recommStreamerList[i] = vo - } - return } -func (s *Service) utilGetDownStreamerRecommList(ctx *gin.Context, recommlist []int64, mid int64) (recommStreamerList []*streamerproto.ApiListExtVO, err error) { - // 获取用户游标 - offset := int64(0) - recommListLength := int64(len(recommlist)) - if consts.StreamerRecommThroughput < recommListLength { - offset, err = s.utilGetUserVisitOffset(ctx, mid, recommListLength) +// 获取该用户当前在主播推荐数组中的游标和是否已经触底标志 +func (s *Service) utilGetStreamerRecommlistOffsetAndBottomeFlag(ctx *gin.Context, recommListLength int64, mid int64, opType int64) (offset int64, bottomFlag int64, err error) { + offset = int64(0) + var uservisitoffset *dbstruct.UserVisitOffset + + // 初始化操作 + switch opType { + case consts.Recomm_Init: + err = s.utilGetInitUserVisitOffset(ctx, mid, int64(recommListLength)) if err != nil { - logger.Error("utilGetUserVisitOffset fail, err: %v", err) + logger.Error("utilGetInitUserVisitOffset fail, err: %v", err) return } + // 向上滚动操作 + case consts.Recomm_Up: + // 若吞吐量比推荐数组长度小,则正常操作,否则认为游标永远是0 + if consts.StreamerRecommThroughput < recommListLength { + uservisitoffset, err = s.utilGetUpUserVisitOffset(ctx, mid, int64(recommListLength)) + if err != nil { + logger.Error("utilGetUpUserVisitOffset fail, err: %v", err) + return + } + offset = uservisitoffset.StreamerRecommOffset + } + // 向下滚动操作 + case consts.Recomm_Down: + // 若吞吐量比推荐数组长度小,则正常操作,否则认为直接触底 + if consts.StreamerRecommThroughput < recommListLength { + uservisitoffset, err = s.utilGetDownUserVisitOffset(ctx, mid, int64(recommListLength)) + if err != nil { + logger.Error("utilGetUpUserVisitOffset fail, err: %v", err) + return + } + if uservisitoffset.GetBottomFlag() == 1 { + return 0, 1, nil + } + offset = uservisitoffset.StreamerRecommOffset + } else { + return 0, 1, nil + } + } + return +} + +func (s *Service) utilGetStreamerRecommListVO(ctx *gin.Context, recommlist []int64, mid int64, opType int64) (recommStreamerList []*streamerproto.ApiListExtVO, err error) { + // 获取用户游标 + offset := int64(0) + recommListLength := len(recommlist) + upperBound := recommListLength + + offset, bottomFlag, err := s.utilGetStreamerRecommlistOffsetAndBottomeFlag(ctx, int64(recommListLength), mid, opType) + if err != nil { + logger.Error("utilGetStreamerRecommlistOffsetAndBottomeFlag fail, err: %v", err) + return + } + if bottomFlag == 1 { // 已触底 + return make([]*streamerproto.ApiListExtVO, 0), nil } // 根据用户游标查询得到结果 midList := make([]int64, 0) - for i := 0; i < int(math.Min(float64(consts.StreamerRecommThroughput), float64(recommListLength))); i++ { - index := (offset + int64(i)) % recommListLength + for i := 0; i < upperBound; i++ { + index := (offset + int64(i)) % int64(recommListLength) midList = append(midList, recommlist[index]) } + accountList, err := _DefaultAccount.OpListByMids(ctx, &accountproto.OpListByMidsReq{ Mids: midList, Sort: []string{"_id"}, @@ -773,6 +849,17 @@ func (s *Service) utilGetDownStreamerRecommList(ctx *gin.Context, recommlist []i return } + // 构建一个accountList -> midList 索引的map + _accountIndexMap := make(map[int]int) + for i, account := range accountList { + mid1 := util.DerefInt64(account.Mid) + for j, mid2 := range midList { + if mid1 == mid2 { + _accountIndexMap[i] = j + } + } + } + streamerExts := make([]streamerproto.StreamerExtVO, len(accountList)) for i := range streamerExts { streamerExts[i] = &streamerproto.ApiListExtVO{} @@ -786,7 +873,8 @@ func (s *Service) utilGetDownStreamerRecommList(ctx *gin.Context, recommlist []i recommStreamerList = make([]*streamerproto.ApiListExtVO, len(streamerExts)) for i, streamerExt := range streamerExts { vo, _ := streamerExt.(*streamerproto.ApiListExtVO) - recommStreamerList[i] = vo + // 从下标位置映射回去 + recommStreamerList[_accountIndexMap[i]] = vo } return diff --git a/dbstruct/user_visit_offset.go b/dbstruct/user_visit_offset.go index 037e3233..15acfd6d 100644 --- a/dbstruct/user_visit_offset.go +++ b/dbstruct/user_visit_offset.go @@ -1,7 +1,15 @@ package dbstruct type UserVisitOffset struct { - Id int64 `json:"id" bson:"_id"` //id,用户的mid - StreamerRecommOffset int64 `json:"streamer_recomm_offset" bson:"streamer_recomm_offset"` //主播推荐列表偏移量 - Ver int64 `json:"ver" bson:"ver"` //乐观锁 + Id int64 `json:"id" bson:"_id"` // id,用户的mid + StreamerRecommOffset int64 `json:"streamer_recomm_offset" bson:"streamer_recomm_offset"` // 主播推荐列表偏移量 + BottomFlag *int64 `json:"bottom_flag" bson:"bottom_flag"` // 是否触底标志 + Ver int64 `json:"ver" bson:"ver"` // 乐观锁 +} + +func (p *UserVisitOffset) GetBottomFlag() int64 { + if p == nil || p.BottomFlag == nil { + return 0 + } + return *p.BottomFlag }