diff --git a/api/proto/streamer/proto/streamer_api.go b/api/proto/streamer/proto/streamer_api.go index a8d6b598..b518bd91 100644 --- a/api/proto/streamer/proto/streamer_api.go +++ b/api/proto/streamer/proto/streamer_api.go @@ -24,7 +24,7 @@ type ApiListReq struct { base.BaseRequest Offset int `json:"offset"` Limit int `json:"limit"` - Sort string + Sort []string } type ApiListData struct { diff --git a/api/proto/streamer/proto/streamer_op.go b/api/proto/streamer/proto/streamer_op.go index 182e3a35..dfbeb256 100644 --- a/api/proto/streamer/proto/streamer_op.go +++ b/api/proto/streamer/proto/streamer_op.go @@ -52,7 +52,7 @@ type OpListReq struct { base.BaseRequest Offset int `json:"offset"` Limit int `json:"limit"` - Sort string + Sort []string } type OpListData struct { diff --git a/app/mix/controller/init.go b/app/mix/controller/init.go index 623519a6..f8592b27 100644 --- a/app/mix/controller/init.go +++ b/app/mix/controller/init.go @@ -313,7 +313,7 @@ func Init(r *gin.Engine) { opStreamerGroup.POST("list_ext_fuzzily_by_user_id", middleware.JSONParamValidator(streamerproto.OpListExtFuzzilyByUserIdReq{}), middleware.JwtAuthenticator(), OpGetStreamerExtListFuzzilyByUserId) opStreamerGroup.POST("list_ext_fuzzily_by_name", middleware.JSONParamValidator(streamerproto.OpListExtFuzzilyByNameReq{}), middleware.JwtAuthenticator(), OpGetStreamerExtListFuzzilyByName) opStreamerGroup.POST("list_wx_id", middleware.JSONParamValidator(streamerproto.OpListStreamerWxIdReq{}), middleware.JwtAuthenticator(), OpGetStreamerWxId) - opStreamerGroup.POST("recomm_list", middleware.JSONParamValidator(streamerproto.OpRecommListReq{}), middleware.JwtAuthenticator(), OpGetStreamerRecommList) + //opStreamerGroup.POST("recomm_list", middleware.JSONParamValidator(streamerproto.OpRecommListReq{}), middleware.JwtAuthenticator(), OpGetStreamerRecommList) // 意见反馈 opFeedbackGroup := r.Group("/op/feedback", PrepareOp()) diff --git a/app/mix/controller/streamer_op.go b/app/mix/controller/streamer_op.go index c847d2c1..082fe745 100644 --- a/app/mix/controller/streamer_op.go +++ b/app/mix/controller/streamer_op.go @@ -242,19 +242,3 @@ func OpGetStreamerWxId(ctx *gin.Context) { } ReplyOk(ctx, data) } - -func OpGetStreamerRecommList(ctx *gin.Context) { - req := ctx.MustGet("client_req").(*streamerproto.OpRecommListReq) - - list, ec := service.DefaultService.OpGetStreamerRecommList(ctx, req) - if ec != errcode.ErrCodeStreamerSrvOk { - logger.Error("OpGetStreamerRecommList fail, req: %v, ec: %v", util.ToJson(req), ec) - ReplyErrCodeMsg(ctx, ec) - return - } - - data := &streamerproto.OpRecommListData{ - RecommList: list, - } - ReplyOk(ctx, data) -} diff --git a/app/mix/dao/mongo.go b/app/mix/dao/mongo.go index d50b2424..339223a3 100644 --- a/app/mix/dao/mongo.go +++ b/app/mix/dao/mongo.go @@ -2034,10 +2034,10 @@ func (m *Mongo) GetStreamerList(ctx *gin.Context, req *streamerproto.OpListReq) "del_flag": 0, } //排序规则 - if req.Sort == "" { - req.Sort = "-ct" + if len(req.Sort) == 0 { + req.Sort = append(req.Sort, "-ct") } - err := col.Find(ctx, query).Sort(req.Sort).Skip(int64(req.Offset)).Limit(int64(req.Limit)).All(&list) + err := col.Find(ctx, query).Sort(req.Sort...).Skip(int64(req.Offset)).Limit(int64(req.Limit)).All(&list) if err == qmgo.ErrNoSuchDocuments { err = nil return list, err diff --git a/app/mix/service/apiservice.go b/app/mix/service/apiservice.go index 84a02f87..ed3480d0 100644 --- a/app/mix/service/apiservice.go +++ b/app/mix/service/apiservice.go @@ -32,6 +32,7 @@ import ( "service/library/contentaudit/imageaudit" "service/library/contentaudit/textaudit" "service/library/logger" + "service/library/redis" "go.mongodb.org/mongo-driver/mongo" goproto "google.golang.org/protobuf/proto" @@ -1550,6 +1551,21 @@ func (s *Service) ApiGetContactCustomerServiceSessionListByMid(ctx *gin.Context, func (s *Service) ApiCreateMoment(ctx *gin.Context, req *momentproto.ApiCreateReq) (ec errcode.ErrCode) { ec = errcode.ErrCodeMomentSrvOk + // 创建完动态,将动态加载到当前redis数组 + defer func() { + if req.Moment.Id == nil { + return + } + go func() { + id := util.DerefInt64(req.Moment.Id) + err := redis.GetRedisClient().RPush(consts.RedisMomentPrefix+"recent_list", id) + if err != nil { + logger.Error("Push newly-created moment to list failed : %v", err) + return + } + }() + }() + req.Moment.Mid = goproto.Int64(req.BaseRequest.Mid) if ec = s.ApiCreateMomentBusinessValidate(ctx, req); ec != errcode.ErrCodeMomentSrvOk { @@ -1904,17 +1920,32 @@ func (s *Service) ApiGetMomentRecommList(ctx *gin.Context, req *momentproto.ApiR ec = errcode.ErrCodeMomentSrvOk - // 从redis中获取主播列表 - recommlist, err := s.utilGetMomentRecommList(ctx) + // 1.查找本时段有无新发动态 + ids, err := s.utilGetMomentRecentListIds(ctx, req.Mid, req.OpType, consts.MomentRecommThroughput) if err != nil { - logger.Error("utilGetMomentRecommList fail, err: %v", err) + logger.Error("utilGetMomentRecentListIds fail, err: %v", err) ec = errcode.ErrCodeMomentSrvFail return } - recommMomentList, err = s.utilGetMomentRecommListVO(ctx, recommlist, req.Mid, req.OpType) + // 2.删去已查询的新动态 + recommThroughput := int64(consts.MomentRecommThroughput - len(ids)) + + // 3.补齐后续动态 + if recommThroughput > 0 { + recommIds, err := s.utilGetMomentRecommListIds(ctx, req.Mid, req.OpType, recommThroughput) + if err != nil { + logger.Error("utilGetMomentRecommListIds fail, err: %v", err) + ec = errcode.ErrCodeMomentSrvFail + return + } + ids = append(ids, recommIds...) + } + + // 4.查询动态内容 + recommMomentList, err = s.utilGetApiMomentVOListByIds(ctx, req.Mid, ids) if err != nil { - logger.Error("utilGetUpMomentRecommList fail, err: %v", err) + logger.Error("utilGetMomentRecentListIds fail, err: %v", err) ec = errcode.ErrCodeMomentSrvFail return } diff --git a/app/mix/service/service.go b/app/mix/service/service.go index 29eadb98..864bbb09 100644 --- a/app/mix/service/service.go +++ b/app/mix/service/service.go @@ -50,7 +50,6 @@ import ( "service/library/mycrypto" "service/library/payclients/alipaycli" "service/library/payclients/wxpaycli" - "service/library/redis" "go.mongodb.org/mongo-driver/mongo" goproto "google.golang.org/protobuf/proto" @@ -1982,56 +1981,6 @@ func (s *Service) OpGetStreamerWxId(ctx *gin.Context, req *streamerproto.OpListS return } -// 推荐 -func (s *Service) OpGetStreamerRecommList(ctx *gin.Context, req *streamerproto.OpRecommListReq) (recommlist []int64, ec errcode.ErrCode) { - - ec = errcode.ErrCodeStreamerSrvOk - - //1.从redis中获取数据 - err := redis.GetRedisClient().GetObject(consts.RedisStreamerPrefix+"recomm_list", &recommlist) - if err != nil { - logger.Error("Redis read failed : %v", err) - ec = errcode.ErrCodeStreamerRecommListRedisCacheInvalid - return - } - - //2.若redis命中失败,再从数据库查 - if len(recommlist) == 0 { - logger.Error("Redis hit failed, reading recommendation list from mongo...") - - // recommlist, _, err = _DefaultAccountRelation.OpIsFollowedCount(ctx) - // if err != nil { - // logger.Error("OpIsFollowedCount fail, err: %v", err) - // ec = errcode.ErrCodeAccountRelationSrvFail - // return - // } - - list, err := _DefaultStreamer.OpList(ctx, &streamerproto.OpListReq{ - Sort: "-fans", - }) - if err != nil { - logger.Error("OpList fail, err: %v", err) - ec = errcode.ErrCodeAccountRelationSrvFail - return - } - recommlist = make([]int64, len(list)) - for i, streamer := range list { - recommlist[i] = util.DerefInt64(streamer.Mid) - } - - //若数据库命中成功,则立即加载进redis - if len(recommlist) != 0 { - err := redis.GetRedisClient().Set(consts.RedisStreamerPrefix+"recomm_list", recommlist, 0) - if err != nil { - logger.Error("Redis cache fail, err: %v", err) - } - - } - } - - return -} - // Feedback func (s *Service) OpCreateFeedback(ctx *gin.Context, req *feedbackproto.OpCreateReq) (ec errcode.ErrCode) { ec = errcode.ErrCodeFeedbackSrvOk diff --git a/app/mix/service/utilservice.go b/app/mix/service/utilservice.go index de6f92b6..b933909c 100644 --- a/app/mix/service/utilservice.go +++ b/app/mix/service/utilservice.go @@ -643,7 +643,7 @@ func (s *Service) utilGetStreamerRecommList(ctx *gin.Context) (recommlist []int6 logger.Error("Redis hit failed, reading recommendation list from mongo...") list, err := _DefaultStreamer.OpList(ctx, &streamerproto.OpListReq{ - Sort: "-fans", + Sort: []string{"-fans"}, }) if err != nil { logger.Error("OpList fail, err: %v", err) @@ -890,44 +890,7 @@ func (s *Service) utilGetStreamerRecommListVO(ctx *gin.Context, recommlist []int return } -func (s *Service) utilGetMomentRecommList(ctx *gin.Context) (recommlist []int64, err error) { - - // 1.从redis中获取数据 - err = redis.GetRedisClient().GetObject(consts.RedisMomentPrefix+"recomm_list", &recommlist) - if err != nil { - logger.Error("Redis read failed : %v", err) - return - } - - // 2.若redis命中失败,再从数据库查 - if len(recommlist) == 0 { - logger.Error("Redis hit failed, reading recommendation list from mongo...") - - list, err := _DefaultMoment.OpList(ctx, &momentproto.OpListReq{ - CtLowerBound: goproto.Int64(0), - }) - if err != nil { - logger.Error("OpList fail, err: %v", err) - return nil, err - } - recommlist = make([]int64, len(list)) - for i, moment := range list { - recommlist[i] = util.DerefInt64(moment.Id) - } - - //若数据库命中成功,则立即加载进redis - if len(recommlist) != 0 { - err := redis.GetRedisClient().Set(consts.RedisMomentPrefix+"recomm_list", recommlist, 0) - if err != nil { - logger.Error("Redis cache fail, err: %v", err) - } - } - } - - return -} - -func (s *Service) utilGetInitUserMomentVisitOffset(ctx *gin.Context, mid int64, recommlistLength int64) (err error) { +func (s *Service) utilGetInitUserMomentVisitOffset(ctx *gin.Context, mid int64, recommlistLength int64, throughput int64) (err error) { uservisitoffset, err := _DefaultUserMomentVisitOffset.OpGetUserMomentVisitOffset(ctx, mid) if err != nil { logger.Error("OpGetUserMomentVisitOffset fail, err: %v", err) @@ -943,7 +906,7 @@ func (s *Service) utilGetInitUserMomentVisitOffset(ctx *gin.Context, mid int64, } // 吞吐量大于等于推荐数组长度,则这次获取后已经触底 - if consts.MomentRecommThroughput >= recommlistLength { + if throughput >= recommlistLength { err = execFunc(ctx, &dbstruct.UserMomentVisitOffset{ Id: mid, MomentRecommOffset: 0, @@ -970,7 +933,7 @@ func (s *Service) utilGetInitUserMomentVisitOffset(ctx *gin.Context, mid int64, return nil } -func (s *Service) utilGetUpUserMomentVisitOffset(ctx *gin.Context, mid int64, recommlistLength int64) (uservisitoffset *dbstruct.UserMomentVisitOffset, err error) { +func (s *Service) utilGetUpUserMomentVisitOffset(ctx *gin.Context, mid int64, recommlistLength int64, throughput int64) (uservisitoffset *dbstruct.UserMomentVisitOffset, err error) { uservisitoffset, err = _DefaultUserMomentVisitOffset.OpGetUserMomentVisitOffset(ctx, mid) if err != nil { logger.Error("OpGetUserMomentVisitOffset fail, err: %v", err) @@ -980,7 +943,7 @@ func (s *Service) utilGetUpUserMomentVisitOffset(ctx *gin.Context, mid int64, re // 默认用户刚进推荐页面的第一次操作一定是冷启动操作,游标也由冷启动创建 return nil, qmgo.ErrNoSuchDocuments } else { - nowoffset := (uservisitoffset.MomentRecommOffset + consts.MomentRecommThroughput) % recommlistLength + nowoffset := (uservisitoffset.MomentRecommOffset + throughput) % recommlistLength // 向上操作固定清掉触底标志 var bottomFlagPtr *int64 = nil if uservisitoffset.GetBottomFlag() == 1 { @@ -1000,7 +963,7 @@ func (s *Service) utilGetUpUserMomentVisitOffset(ctx *gin.Context, mid int64, re return } -func (s *Service) utilGetDownUserMomentVisitOffset(ctx *gin.Context, mid int64, recommlistLength int64) (uservisitoffset *dbstruct.UserMomentVisitOffset, err error) { +func (s *Service) utilGetDownUserMomentVisitOffset(ctx *gin.Context, mid int64, recommlistLength int64, throughput int64) (uservisitoffset *dbstruct.UserMomentVisitOffset, err error) { uservisitoffset, err = _DefaultUserMomentVisitOffset.OpGetUserMomentVisitOffset(ctx, mid) if err != nil { logger.Error("OpGetUserMomentVisitOffset fail, err: %v", err) @@ -1015,7 +978,7 @@ func (s *Service) utilGetDownUserMomentVisitOffset(ctx *gin.Context, mid int64, return } offset := uservisitoffset.MomentRecommOffset - if offset+consts.MomentRecommThroughput >= recommlistLength { + if offset+throughput >= recommlistLength { err = _DefaultUserMomentVisitOffset.OpUpdate(ctx, &dbstruct.UserMomentVisitOffset{ Id: uservisitoffset.Id, MomentRecommOffset: 0, @@ -1029,7 +992,7 @@ func (s *Service) utilGetDownUserMomentVisitOffset(ctx *gin.Context, mid int64, } else { err = _DefaultUserMomentVisitOffset.OpUpdate(ctx, &dbstruct.UserMomentVisitOffset{ Id: uservisitoffset.Id, - MomentRecommOffset: offset + consts.MomentRecommThroughput, + MomentRecommOffset: offset + throughput, Ver: uservisitoffset.Ver, }) if err != nil { @@ -1042,88 +1005,159 @@ func (s *Service) utilGetDownUserMomentVisitOffset(ctx *gin.Context, mid int64, } // 获取该用户当前在主播推荐数组中的游标和是否已经触底标志 -func (s *Service) utilGetMomentRecommlistOffsetAndBottomeFlag(ctx *gin.Context, recommListLength int64, mid int64, opType int64) (offset int64, bottomFlag int64, err error) { +func (s *Service) utilGetMomentRecommlistOffsetAndBottomeFlag(ctx *gin.Context, mid int64, opType int64, recommListLength int64, throughput int64) (offset int64, isAtBottom int64, err error) { offset = int64(0) + isRecommListSuffi := throughput <= recommListLength var uservisitoffset *dbstruct.UserMomentVisitOffset // 初始化操作 switch opType { case consts.Recomm_Init: - err = s.utilGetInitUserMomentVisitOffset(ctx, mid, int64(recommListLength)) + err = s.utilGetInitUserMomentVisitOffset(ctx, mid, recommListLength, throughput) if err != nil { logger.Error("utilGetInitUserMomentVisitOffset fail, err: %v", err) return } // 向上滚动操作 case consts.Recomm_Up: - // 若吞吐量比推荐数组长度小,则正常操作,否则认为游标永远是0 - if consts.MomentRecommThroughput < recommListLength { - uservisitoffset, err = s.utilGetUpUserMomentVisitOffset(ctx, mid, int64(recommListLength)) - if err != nil { - logger.Error("utilGetUpUserMomentVisitOffset fail, err: %v", err) - return - } - offset = uservisitoffset.MomentRecommOffset + // 若吞吐量比推荐数组长度小,则正常操作,否则认为直接触顶(游标永远是0) + if !isRecommListSuffi { + return } + uservisitoffset, err = s.utilGetUpUserMomentVisitOffset(ctx, mid, recommListLength, throughput) + if err != nil { + logger.Error("utilGetUpUserMomentVisitOffset fail, err: %v", err) + return + } + offset = uservisitoffset.MomentRecommOffset // 向下滚动操作 case consts.Recomm_Down: // 若吞吐量比推荐数组长度小,则正常操作,否则认为直接触底 - if consts.MomentRecommThroughput < recommListLength { - uservisitoffset, err = s.utilGetDownUserMomentVisitOffset(ctx, mid, int64(recommListLength)) - if err != nil { - logger.Error("utilGetUpUserMomentVisitOffset fail, err: %v", err) - return - } - if uservisitoffset.GetBottomFlag() == 1 { - return 0, 1, nil - } - offset = uservisitoffset.MomentRecommOffset - } else { + if !isRecommListSuffi { return 0, 1, nil } + uservisitoffset, err = s.utilGetDownUserMomentVisitOffset(ctx, mid, recommListLength, throughput) + if err != nil { + logger.Error("utilGetUpUserMomentVisitOffset fail, err: %v", err) + return + } + if uservisitoffset.GetBottomFlag() == 1 { + return 0, 1, nil + } + offset = uservisitoffset.MomentRecommOffset } return } -func (s *Service) utilGetMomentRecommListVO(ctx *gin.Context, recommlist []int64, mid int64, opType int64) (recommMomentList []*momentproto.ApiMomentVO, err error) { +// 根据提供的用户mid和操作类型,从最近推荐列表中获取吞吐量大小的列表,并更新用户游标 +func (s *Service) utilGetMomentRecentListIds(ctx *gin.Context, mid int64, opType int64, throughput int64) (ids []int64, err error) { + + if opType != consts.Recomm_Up { + return + } + + // 从redis中获取动态列表长度 + recentListLength, err := redis.GetRedisClient().LLen(consts.RedisMomentPrefix + "recent_list") + if err != nil { + logger.Error("Redis read failed : %v", err) + return + } + if recentListLength == 0 { + return + } + + // 获取用户游标 + offset, err := redis.GetRedisClient().HGetInt64(consts.RedisMomentPrefix+"recent_list_offset", fmt.Sprint(mid)) + if err != nil && !redis.GetRedisClient().IsErrNil(err) { + logger.Error("Redis read failed : %v", err) + return + } + // 检验游标是否还可用 + ids = make([]int64, 0) + incr := 0 + for ; incr < int(throughput) && offset < recentListLength; incr++ { + id, err := redis.GetRedisClient().LIndexInt64(consts.RedisMomentPrefix+"recent_list", int(offset)) + if err != nil { + logger.Error("Redis read failed : %v", err) + return make([]int64, 0), err + } + ids = append(ids, id) + offset++ + } + _, err = redis.GetRedisClient().HIncrby(consts.RedisMomentPrefix+"recent_list_offset", fmt.Sprint(mid), incr) + if err != nil && !redis.GetRedisClient().IsErrNil(err) { + logger.Error("Redis cache failed : %v", err) + return + } + + return +} + +// 根据提供的用户mid和操作类型,从推荐列表中获取吞吐量大小的列表,并更新用户游标 +func (s *Service) utilGetMomentRecommListIds(ctx *gin.Context, mid int64, opType int64, throughput int64) (ids []int64, err error) { + // 从redis中获取动态列表长度 + recommListLength, err := redis.GetRedisClient().LLen(consts.RedisMomentPrefix + "recomm_list") + if err != nil { + logger.Error("Redis read failed : %v", err) + return + } + if recommListLength == 0 { + return + } + // 获取用户游标 offset := int64(0) - recommListLength := len(recommlist) upperBound := recommListLength - offset, bottomFlag, err := s.utilGetMomentRecommlistOffsetAndBottomeFlag(ctx, int64(recommListLength), mid, opType) + // 获取游标和是否已触底标志 + offset, bottomFlag, err := s.utilGetMomentRecommlistOffsetAndBottomeFlag(ctx, mid, opType, recommListLength, throughput) if err != nil { logger.Error("utilGetMomentRecommlistOffsetAndBottomeFlag fail, err: %v", err) return } if bottomFlag == 1 { // 已触底 + return make([]int64, 0), nil + } + + if throughput < recommListLength { + upperBound = throughput + } + + // 根据用户游标得到待查询ids + ids = make([]int64, 0) + for i := int64(0); i < upperBound; i++ { + index := (offset + i) % recommListLength + id, err := redis.GetRedisClient().LIndexInt64(consts.RedisMomentPrefix+"recomm_list", int(index)) + if err != nil { + logger.Error("Redis read failed : %v", err) + return make([]int64, 0), err + } + ids = append(ids, id) + } + + return +} + +// 根据提供的ids,查询出动态,并填充主播、是否关注、是否点赞,并保证按照提供的顺序返回ApiMomentVO的list +func (s *Service) utilGetApiMomentVOListByIds(ctx *gin.Context, visitorMid int64, ids []int64) (volist []*momentproto.ApiMomentVO, err error) { + + if len(ids) == 0 { return make([]*momentproto.ApiMomentVO, 0), nil } - if consts.MomentRecommThroughput < recommListLength { - upperBound = consts.MomentRecommThroughput - } - - // 根据用户游标查询得到结果 - idList := make([]int64, 0) - for i := 0; i < upperBound; i++ { - index := (offset + int64(i)) % int64(recommListLength) - idList = append(idList, recommlist[index]) - } - list, err := _DefaultMoment.OpListByIds(ctx, &momentproto.OpListByIdsReq{ - Ids: idList, + Ids: ids, }) if err != nil { logger.Error("_DefaultMoment OpListByIds fail, , err: %v", err) return } - // list -> idList 索引的map + // list -> idList 索引的map,保证不打乱顺序 _momentIndexMap := make(map[int]int) for i, account := range list { id1 := util.DerefInt64(account.Id) - for j, id2 := range idList { + for j, id2 := range ids { if id1 == id2 { _momentIndexMap[i] = j } @@ -1131,7 +1165,7 @@ func (s *Service) utilGetMomentRecommListVO(ctx *gin.Context, recommlist []int64 } // 获取访问者的关注列表 - followMap, err := s.utilGetFollowMap(ctx, mid) + followMap, err := s.utilGetFollowMap(ctx, visitorMid) if err != nil { logger.Error("utilGetFollowMap fail") return @@ -1143,17 +1177,17 @@ func (s *Service) utilGetMomentRecommListVO(ctx *gin.Context, recommlist []int64 logger.Error("utilFillMomentsStreamerInfo fail, , err: %v", err) return } - recommMomentList = make([]*momentproto.ApiMomentVO, len(vos)) + volist = make([]*momentproto.ApiMomentVO, len(vos)) for i, vo := range vos { apiVO, _ := vo.(*momentproto.ApiMomentVO) // 填充是否关注 s.utilFillIsFollowedFillable(ctx, followMap, apiVO) // 填充是否点赞 - if err = s.utilFillIsThumbedUpFillable(ctx, mid, apiVO); err != nil { + if err = s.utilFillIsThumbedUpFillable(ctx, visitorMid, apiVO); err != nil { logger.Error("utilFillIsThumbedUpFillable fail") return } - recommMomentList[_momentIndexMap[i]] = apiVO + volist[_momentIndexMap[i]] = apiVO } return diff --git a/app/mix/service/xxljob_tasks.go b/app/mix/service/xxljob_tasks.go index c1cc4544..0f8ee1ba 100644 --- a/app/mix/service/xxljob_tasks.go +++ b/app/mix/service/xxljob_tasks.go @@ -4,7 +4,6 @@ import ( "context" "fmt" "service/api/consts" - "service/api/errcode" accountproto "service/api/proto/account/proto" contact_customer_service_proto "service/api/proto/contact_customer_service/proto" daily_statementproto "service/api/proto/daily_statement/proto" @@ -29,18 +28,19 @@ import ( func (s *CronService) ReloadRecommList(ctx context.Context, param *xxl.RunReq) (msg string) { logger.Info("task %v param: %v log_id: %v", param.ExecutorHandler, param.ExecutorParams, xxl.Int64ToStr(param.LogID)) logger.Info("Refreshing recommendation list cached in redis...") - list, ec := DefaultService.OpGetStreamerList(&gin.Context{}, &streamerproto.OpListReq{ - Sort: "-fans", + list, err := _DefaultStreamer.OpList(&gin.Context{}, &streamerproto.OpListReq{ + Sort: []string{"fans"}, }) - if ec != errcode.ErrCodeAccountRelationSrvOk { - logger.Error("OpGetStreamerList fail, ec: %v", ec) - return fmt.Sprintf("OpGetStreamerList fail, ec: %v", ec) + if err != nil { + logger.Error("OpList fail, err: %v", err) + return fmt.Sprintf("OpList fail, err: %v", err) } midList := make([]int64, len(list)) for i, streamer := range list { midList[i] = util.DerefInt64(streamer.Mid) } - err := redis.GetRedisClient().Set(consts.RedisStreamerPrefix+"recomm_list", midList, 0) + + err = redis.GetRedisClient().Set(consts.RedisStreamerPrefix+"recomm_list", midList, 0) if err != nil { logger.Error("Redis cache fail, err: %v", err) return fmt.Sprintf("Redis cache fail, err: %v", err) @@ -309,15 +309,27 @@ func (s *CronService) ReloadMomentRecommList(ctx context.Context, param *xxl.Run logger.Error("OpGetMomentList fail, err: %v", err) return fmt.Sprintf("OpGetMomentList fail, err: %v", err) } - momentIdList := make([]int64, len(list)) - for i, moment := range list { - momentIdList[i] = util.DerefInt64(moment.Id) + + // 清缓存 + if err := redis.GetRedisClient().Del(consts.RedisMomentPrefix + "recomm_list"); err != nil { + logger.Error("Del redis cache fail, err: %v", err) + return fmt.Sprintf("Del redis cache fail, err: %v", err) } - err = redis.GetRedisClient().Set(consts.RedisMomentPrefix+"recomm_list", momentIdList, 0) - if err != nil { - logger.Error("Redis cache fail, err: %v", err) - return fmt.Sprintf("Redis cache fail, err: %v", err) + if err := redis.GetRedisClient().Del(consts.RedisMomentPrefix + "recent_list_offset"); err != nil { + logger.Error("Del redis cache fail, err: %v", err) + return fmt.Sprintf("Del redis cache fail, err: %v", err) } + + // 加载缓存 + for _, moment := range list { + id := util.DerefInt64(moment.Id) + err := redis.GetRedisClient().RPush(consts.RedisMomentPrefix+"recomm_list", id) + if err != nil { + logger.Error("Redis cache fail, err: %v", err) + return fmt.Sprintf("Redis cache fail, err: %v", err) + } + } + logger.Info("Refresh moment recommendation list cached in redis accomplished...") return "Refresh moment recommendation list cached in redis accomplished" } diff --git a/library/redis/redis.go b/library/redis/redis.go index 7a481854..b80efdb2 100644 --- a/library/redis/redis.go +++ b/library/redis/redis.go @@ -300,6 +300,12 @@ func (c *Client) HGetAll(key string, val interface{}) error { return err } +// HINCRBY 将保存的整型键值增加 +func (c *Client) HIncrby(key string, field string, val int) (reply interface{}, err error) { + reply, err = c.Do("HINCRBY", c.getKey(key), field, val) + return +} + /** Redis列表是简单的字符串列表,按照插入顺序排序。你可以添加一个元素到列表的头部(左边)或者尾部(右边) **/ @@ -476,7 +482,7 @@ func (c *Client) LREM(key string, count int, member interface{}) (int, error) { // LLen 获取列表的长度 func (c *Client) LLen(key string) (int64, error) { - return Int64(c.Do("RPOP", c.getKey(key))) + return Int64(c.Do("LLEN", c.getKey(key))) } // LRange 返回列表 key 中指定区间内的元素,区间以偏移量 start 和 stop 指定。 @@ -487,6 +493,11 @@ func (c *Client) LRange(key string, start, end int) (interface{}, error) { return c.Do("LRANGE", c.getKey(key), start, end) } +// LIndex 返回列表 key 中指定索引的元素。 +func (c *Client) LIndexInt64(key string, index int) (int64, error) { + return Int64(c.Do("LINDEX", c.getKey(key), index)) +} + /** Redis 有序集合和集合一样也是string类型元素的集合,且不允许重复的成员。 不同的是每个元素都会关联一个double类型的分数。redis正是通过分数来为集合中的成员进行从小到大的排序。 @@ -813,3 +824,8 @@ func (c *Client) closePool() { // func init() { // cache.Register("redis", &Client{}) // } + +// 是否是空错误 +func (c *Client) IsErrNil(err error) bool { + return err == redis.ErrNil +}