diff --git a/api/consts/consts.go b/api/consts/consts.go index 5f86b535..a1d3b2b2 100644 --- a/api/consts/consts.go +++ b/api/consts/consts.go @@ -15,6 +15,12 @@ const ( DefaultPageSize = 10 //默认页长为10 ) +// 推荐每次吞吐量 +const ( + StreamerRecommThroughput = 4 + MomentRecommThroughput = 4 +) + // 设备类型 const ( DevType_Android = 0 @@ -68,9 +74,10 @@ const ( // redis键前缀 const ( RedisStreamerPrefix = "streamer:" //streamer服务前缀 + RedisMomentPrefix = "moment:" //moment服务前缀 ) -//const PackageRootPath = "C:/Users/PC/Desktop/wishpal_ironfan_service/service" +//const PackageRootPath = "C:/Users/PC/Desktop/service" const PackageRootPath = "/app/wishpal-ironfan" @@ -78,7 +85,7 @@ const MainConfigPath = PackageRootPath + "/etc/mix/mix-test.yaml" const ProductionConfigPath = PackageRootPath + "/etc/mix/mix-prod.yaml" -const LocalConfigPath = "/Users/erwin/wishpal/wishpal-ironfan/etc/mix/mix-local.yaml" +const LocalConfigPath = "C:/Users/PC/Desktop/service/etc/mix/mix-local.yaml" const ReservedUserIdRegexesConfig = PackageRootPath + "/etc/mix/resource/reg_reserved_user_id_config.xml" diff --git a/api/consts/option.go b/api/consts/option.go index ba31b497..4ea98f97 100644 --- a/api/consts/option.go +++ b/api/consts/option.go @@ -13,3 +13,9 @@ const ( const ( AccountPunishment_BlockFromCreatingMoment = 0 // 禁止发贴 ) + +const ( + Recomm_Down = 0 + Recomm_Up = 1 + Recomm_Init = 2 +) diff --git a/api/errcode/errcode.go b/api/errcode/errcode.go index 8c4f4c06..ba3d1848 100644 --- a/api/errcode/errcode.go +++ b/api/errcode/errcode.go @@ -166,6 +166,9 @@ var ErrCodeMsgMap = map[ErrCode]string{ ErrCodeAccountCancellationSrvFail: "账户注销服务错误", ErrCodeAccountCancellationNotExist: "账户注销不存在", + + ErrCodeUserVisitOffsetSrvFail: "用户游标表服务错误", + ErrCodeUserVisitOffsetNotExist: "用户游标表不存在", } const ( @@ -380,6 +383,16 @@ const ( ErrCodeMomentAuditTaskSrvFail ErrCode = -29001 // 动态审核任务表服务错误 ErrCodeMomentAuditTaskNotExist ErrCode = -29002 // 动态审核任务表不存在 + // AccountCancellation: 30xxx + ErrCodeAccountCancellationSrvOk ErrCode = ErrCodeOk + ErrCodeAccountCancellationSrvFail ErrCode = -30001 // 账户注销服务错误 + ErrCodeAccountCancellationNotExist ErrCode = -30002 // 账户注销不存在 + + // UserVisitOffset: 30xxx + ErrCodeUserVisitOffsetSrvOk ErrCode = ErrCodeOk + ErrCodeUserVisitOffsetSrvFail ErrCode = -31001 // 用户游标表服务错误 + ErrCodeUserVisitOffsetNotExist ErrCode = -31002 // 用户游标表不存在 + // AccountPunishment: 32xxx ErrCodeAccountPunishmentSrvOk ErrCode = ErrCodeOk ErrCodeAccountPunishmentSrvFail ErrCode = -32001 // 账号处罚服务错误 @@ -390,11 +403,6 @@ const ( ErrCodeAccountPunishmentHasBeenInterrupted ErrCode = -32006 // 账号处罚已提前中止 ErrCodeAccountPunishmentStreamerOnly ErrCode = -32007 // 该账号处罚仅能对主播进行 - // AccountCancellation: 30xxx - ErrCodeAccountCancellationSrvOk ErrCode = ErrCodeOk - ErrCodeAccountCancellationSrvFail ErrCode = -30001 // 账户注销服务错误 - ErrCodeAccountCancellationNotExist ErrCode = -30002 // 账户注销不存在 - // Media: 60xxx ErrCodeMediaSrvOk ErrCode = ErrCodeOk ErrCodeMediaSrvFail ErrCode = -60001 // 媒体服务错误 diff --git a/api/proto/moment/proto/moment_api.go b/api/proto/moment/proto/moment_api.go index 6617f8c9..79fff392 100644 --- a/api/proto/moment/proto/moment_api.go +++ b/api/proto/moment/proto/moment_api.go @@ -140,3 +140,18 @@ type ApiListByIdsResp struct { base.BaseResponse Data *ApiListByIdsData `json:"data"` } + +// api 推荐 +type ApiRecommListReq struct { + base.BaseRequest + OpType int64 `json:"op_type"` +} + +type ApiRecommListData struct { + RecommList []*ApiMomentVO `json:"recomm_list"` +} + +type ApiRecommListResp struct { + base.BaseResponse + Data *ApiRecommListData `json:"data"` +} diff --git a/api/proto/streamer/proto/streamer_api.go b/api/proto/streamer/proto/streamer_api.go index 409b915c..b518bd91 100644 --- a/api/proto/streamer/proto/streamer_api.go +++ b/api/proto/streamer/proto/streamer_api.go @@ -24,7 +24,7 @@ type ApiListReq struct { base.BaseRequest Offset int `json:"offset"` Limit int `json:"limit"` - Sort string + Sort []string } type ApiListData struct { @@ -180,12 +180,11 @@ type ApiListStreamerWxIdResp struct { // api 推荐 type ApiRecommListReq struct { base.BaseRequest - Offset int `json:"offset"` - Limit int `json:"limit"` + OpType int64 `json:"op_type"` } type ApiRecommListData struct { - RecommList []int64 `json:"recomm_list"` + RecommList []*ApiListExtVO `json:"recomm_list"` } type ApiRecommListResp struct { diff --git a/api/proto/streamer/proto/streamer_op.go b/api/proto/streamer/proto/streamer_op.go index 182e3a35..dfbeb256 100644 --- a/api/proto/streamer/proto/streamer_op.go +++ b/api/proto/streamer/proto/streamer_op.go @@ -52,7 +52,7 @@ type OpListReq struct { base.BaseRequest Offset int `json:"offset"` Limit int `json:"limit"` - Sort string + Sort []string } type OpListData struct { diff --git a/app/mix/controller/init.go b/app/mix/controller/init.go index 2a9705dd..9623dc67 100644 --- a/app/mix/controller/init.go +++ b/app/mix/controller/init.go @@ -107,6 +107,7 @@ func Init(r *gin.Engine) { apiMomentGroup.POST("list_by_mids", middleware.JSONParamValidator(momentproto.ApiListByMidsReq{}), middleware.JwtAuthenticator(), ApiGetMomentListByMids) apiMomentGroup.POST("thumbs_up", middleware.JSONParamValidator(momentproto.ApiThumbsUpReq{}), middleware.JwtAuthenticator(), ApiThumbsUpMoment) apiMomentGroup.POST("list_by_ids", middleware.JSONParamValidator(momentproto.ApiListByIdsReq{}), middleware.JwtAuthenticator(), ApiGetMomentListByIds) + apiMomentGroup.POST("recomm_list", middleware.JSONParamValidator(momentproto.ApiRecommListReq{}), middleware.JwtAuthenticator(), ApiGetMomentRecommList) // 足迹 // apiFootPrintGroup := r.Group("/api/footprint", PrepareToC()) @@ -323,7 +324,7 @@ func Init(r *gin.Engine) { opStreamerGroup.POST("list_ext_fuzzily_by_user_id", middleware.JSONParamValidator(streamerproto.OpListExtFuzzilyByUserIdReq{}), middleware.JwtAuthenticator(), OpGetStreamerExtListFuzzilyByUserId) opStreamerGroup.POST("list_ext_fuzzily_by_name", middleware.JSONParamValidator(streamerproto.OpListExtFuzzilyByNameReq{}), middleware.JwtAuthenticator(), OpGetStreamerExtListFuzzilyByName) opStreamerGroup.POST("list_wx_id", middleware.JSONParamValidator(streamerproto.OpListStreamerWxIdReq{}), middleware.JwtAuthenticator(), OpGetStreamerWxId) - opStreamerGroup.POST("recomm_list", middleware.JSONParamValidator(streamerproto.OpRecommListReq{}), middleware.JwtAuthenticator(), OpGetStreamerRecommList) + //opStreamerGroup.POST("recomm_list", middleware.JSONParamValidator(streamerproto.OpRecommListReq{}), middleware.JwtAuthenticator(), OpGetStreamerRecommList) // 意见反馈 opFeedbackGroup := r.Group("/op/feedback", PrepareOp()) diff --git a/app/mix/controller/moment_api.go b/app/mix/controller/moment_api.go index fca17ce3..7be75266 100644 --- a/app/mix/controller/moment_api.go +++ b/app/mix/controller/moment_api.go @@ -187,3 +187,25 @@ func ApiThumbsUpMoment(ctx *gin.Context) { ReplyOk(ctx, nil) } + +func ApiGetMomentRecommList(ctx *gin.Context) { + req := ctx.MustGet("client_req").(*momentproto.ApiRecommListReq) + + list, ec := service.DefaultService.ApiGetMomentRecommList(ctx, req) + if ec != errcode.ErrCodeMomentSrvOk { + logger.Error("OpGetMomentRecommList fail, req: %v, ec: %v", util.ToJson(req), ec) + ReplyErrCodeMsg(ctx, ec) + return + } + + mediaFillableList := make([]mediafiller.MediaFillable, len(list)) + for i, media := range list { + mediaFillableList[i] = media.MediaComp + } + mediafiller.FillList(ctx, mediaFillableList) + + data := &momentproto.ApiRecommListData{ + RecommList: list, + } + ReplyOk(ctx, data) +} diff --git a/app/mix/controller/streamer_api.go b/app/mix/controller/streamer_api.go index a039ae35..25451200 100644 --- a/app/mix/controller/streamer_api.go +++ b/app/mix/controller/streamer_api.go @@ -223,6 +223,16 @@ func ApiGetStreamerRecommList(ctx *gin.Context) { return } + objectMediaNum := 4 // 单个主播服务总共4个媒体类 + mediaFillableList := make([]mediafiller.MediaFillable, len(list)*objectMediaNum) + for i, vo := range list { + mediaFillableList[objectMediaNum*i+0] = vo.Avatar + mediaFillableList[objectMediaNum*i+1] = vo.Cover + mediaFillableList[objectMediaNum*i+2] = vo.Shorts + mediaFillableList[objectMediaNum*i+3] = vo.Album + } + mediafiller.FillList(ctx, mediaFillableList) + data := &streamerproto.ApiRecommListData{ RecommList: list, } diff --git a/app/mix/controller/streamer_op.go b/app/mix/controller/streamer_op.go index c847d2c1..082fe745 100644 --- a/app/mix/controller/streamer_op.go +++ b/app/mix/controller/streamer_op.go @@ -242,19 +242,3 @@ func OpGetStreamerWxId(ctx *gin.Context) { } ReplyOk(ctx, data) } - -func OpGetStreamerRecommList(ctx *gin.Context) { - req := ctx.MustGet("client_req").(*streamerproto.OpRecommListReq) - - list, ec := service.DefaultService.OpGetStreamerRecommList(ctx, req) - if ec != errcode.ErrCodeStreamerSrvOk { - logger.Error("OpGetStreamerRecommList fail, req: %v, ec: %v", util.ToJson(req), ec) - ReplyErrCodeMsg(ctx, ec) - return - } - - data := &streamerproto.OpRecommListData{ - RecommList: list, - } - ReplyOk(ctx, data) -} diff --git a/app/mix/dao/mongo.go b/app/mix/dao/mongo.go index 0983c0ac..908d5a64 100644 --- a/app/mix/dao/mongo.go +++ b/app/mix/dao/mongo.go @@ -164,6 +164,12 @@ const ( DBAccountCancellation = "account_cancellation" COLAccountCancellation = "account_cancellation" + + DBUserVisitOffset = "user_visit_offset" + COLUserVisitOffset = "user_visit_offset" + + DBUserMomentVisitOffset = "user_moment_visit_offset" + COLUserMomentVisitOffset = "user_moment_visit_offset" ) // 商品表 @@ -390,6 +396,16 @@ func (m *Mongo) getColAccountCancellation() *qmgo.Collection { return m.clientMix.Database(DBAccountCancellation).Collection(COLAccountCancellation) } +// 用户访问偏移量表 +func (m *Mongo) getColUserVisitOffset() *qmgo.Collection { + return m.clientMix.Database(DBUserVisitOffset).Collection(COLUserVisitOffset) +} + +// 用户动态访问偏移量表 +func (m *Mongo) getColUserMomentVisitOffset() *qmgo.Collection { + return m.clientMix.Database(DBUserMomentVisitOffset).Collection(COLUserMomentVisitOffset) +} + // 商品相关 func (m *Mongo) CreateProduct(ctx *gin.Context, product *dbstruct.Product) error { col := m.getColProduct() @@ -2120,10 +2136,10 @@ func (m *Mongo) GetStreamerList(ctx *gin.Context, req *streamerproto.OpListReq) "del_flag": 0, } //排序规则 - if req.Sort == "" { - req.Sort = "-ct" + if len(req.Sort) == 0 { + req.Sort = append(req.Sort, "-ct") } - err := col.Find(ctx, query).Sort(req.Sort).Skip(int64(req.Offset)).Limit(int64(req.Limit)).All(&list) + err := col.Find(ctx, query).Sort(req.Sort...).Skip(int64(req.Offset)).Limit(int64(req.Limit)).All(&list) if err == qmgo.ErrNoSuchDocuments { err = nil return list, err @@ -3609,3 +3625,81 @@ func (m *Mongo) GetAccountCancellationListByMid(ctx *gin.Context, req *account_c } return accountCancellation, err } + +func (m *Mongo) CreateUserVisitOffset(ctx *gin.Context, uservisitoffset *dbstruct.UserVisitOffset) error { + col := m.getColUserVisitOffset() + _, err := col.InsertOne(ctx, uservisitoffset) + return err +} + +func (m *Mongo) UpdateUserVisitOffset(ctx *gin.Context, uservisitoffset *dbstruct.UserVisitOffset) error { + col := m.getColUserVisitOffset() + filter := qmgo.M{ + "_id": uservisitoffset.Id, + "ver": uservisitoffset.Ver, + } + set := qmgo.M{ + "streamer_recomm_offset": uservisitoffset.StreamerRecommOffset, + } + if uservisitoffset.BottomFlag != nil { + set["bottom_flag"] = uservisitoffset.GetBottomFlag() + } + update := qmgo.M{ + "$set": set, + } + err := col.UpdateOne(ctx, filter, update) + return err +} + +func (m *Mongo) GetUserVisitOffset(ctx *gin.Context, mid int64) (*dbstruct.UserVisitOffset, error) { + uservisitoffset := &dbstruct.UserVisitOffset{} + col := m.getColUserVisitOffset() + query := qmgo.M{ + "_id": mid, + } + err := col.Find(ctx, query).One(uservisitoffset) + if err == qmgo.ErrNoSuchDocuments { + err = nil + return nil, err + } + return uservisitoffset, err +} + +func (m *Mongo) CreateUserMomentVisitOffset(ctx *gin.Context, usermomentvisitoffset *dbstruct.UserMomentVisitOffset) error { + col := m.getColUserMomentVisitOffset() + _, err := col.InsertOne(ctx, usermomentvisitoffset) + return err +} + +func (m *Mongo) UpdateUserMomentVisitOffset(ctx *gin.Context, usermomentvisitoffset *dbstruct.UserMomentVisitOffset) error { + col := m.getColUserMomentVisitOffset() + filter := qmgo.M{ + "_id": usermomentvisitoffset.Id, + "ver": usermomentvisitoffset.Ver, + } + set := qmgo.M{ + "moment_recomm_offset": usermomentvisitoffset.MomentRecommOffset, + } + if usermomentvisitoffset.BottomFlag != nil { + set["bottom_flag"] = usermomentvisitoffset.GetBottomFlag() + } + update := qmgo.M{ + "$set": set, + } + err := col.UpdateOne(ctx, filter, update) + return err +} + +func (m *Mongo) GetUserMomentVisitOffset(ctx *gin.Context, mid int64) (*dbstruct.UserMomentVisitOffset, error) { + usermomentvisitoffset := &dbstruct.UserMomentVisitOffset{} + col := m.getColUserMomentVisitOffset() + query := qmgo.M{ + "_id": mid, + } + err := col.Find(ctx, query).One(usermomentvisitoffset) + if err == qmgo.ErrNoSuchDocuments { + err = nil + return nil, err + } + return usermomentvisitoffset, err +} diff --git a/app/mix/service/apiservice.go b/app/mix/service/apiservice.go index 6b8ea2a8..10bc2edd 100644 --- a/app/mix/service/apiservice.go +++ b/app/mix/service/apiservice.go @@ -1339,7 +1339,7 @@ func (s *Service) ApiGetStreamerWxId(ctx *gin.Context, req *streamerproto.ApiLis } // 推荐 -func (s *Service) ApiGetStreamerRecommList(ctx *gin.Context, req *streamerproto.ApiRecommListReq) (recommlist []int64, ec errcode.ErrCode) { +func (s *Service) ApiGetStreamerRecommList(ctx *gin.Context, req *streamerproto.ApiRecommListReq) (recommStreamerList []*streamerproto.ApiListExtVO, ec errcode.ErrCode) { ec = errcode.ErrCodeStreamerSrvOk @@ -1351,48 +1351,15 @@ func (s *Service) ApiGetStreamerRecommList(ctx *gin.Context, req *streamerproto. return nil, errcode.ErrCodeApolloReadFail } - //1.从redis中获取数据 - err = redis.GetRedisClient().GetObject(consts.RedisStreamerPrefix+"recomm_list", &recommlist) + // 从redis中获取主播列表 + recommlist, err := s.utilGetStreamerRecommList(ctx) if err != nil { - logger.Error("Redis read failed : %v", err) - ec = errcode.ErrCodeStreamerRecommListRedisCacheInvalid + logger.Error("utilGetStreamerRecommList fail, err: %v", err) + ec = errcode.ErrCodeStreamerSrvFail return } - //2.若redis命中失败,再从数据库查 - if len(recommlist) == 0 { - logger.Error("Redis hit failed, reading recommendation list from mongo...") - - // recommlist, _, err = _DefaultAccountRelation.OpIsFollowedCount(ctx) - // if err != nil { - // logger.Error("OpIsFollowedCount fail, err: %v", err) - // ec = errcode.ErrCodeAccountRelationSrvFail - // return - // } - - list, err := _DefaultStreamer.OpList(ctx, &streamerproto.OpListReq{ - Sort: "-fans", - }) - if err != nil { - logger.Error("OpList fail, err: %v", err) - ec = errcode.ErrCodeAccountRelationSrvFail - return - } - recommlist = make([]int64, len(list)) - for i, streamer := range list { - recommlist[i] = util.DerefInt64(streamer.Mid) - } - - //若数据库命中成功,则立即加载进redis - if len(recommlist) != 0 { - err := redis.GetRedisClient().Set(consts.RedisStreamerPrefix+"recomm_list", recommlist, 0) - if err != nil { - logger.Error("Redis cache fail, err: %v", err) - } - - } - } - + // 查询是否限制访客 visitorMid := req.BaseRequest.Mid for _, restricted_visitor_mid := range cfg.RestrictedVisitorMids { if restricted_visitor_mid == visitorMid { // 是限制访问的访客 @@ -1411,6 +1378,13 @@ func (s *Service) ApiGetStreamerRecommList(ctx *gin.Context, req *streamerproto. } } + recommStreamerList, err = s.utilGetStreamerRecommListVO(ctx, recommlist, req.Mid, req.OpType) + if err != nil { + logger.Error("utilGetUpStreamerRecommList fail, err: %v", err) + ec = errcode.ErrCodeStreamerSrvFail + return + } + return } @@ -1710,6 +1684,21 @@ func (s *Service) ApiGetContactCustomerServiceSessionListByMid(ctx *gin.Context, func (s *Service) ApiCreateMoment(ctx *gin.Context, req *momentproto.ApiCreateReq) (ec errcode.ErrCode, acctPunEndTime string) { ec = errcode.ErrCodeMomentSrvOk + // 创建完动态,将动态加载到当前redis数组 + defer func() { + if req.Moment.Id == nil { + return + } + go func() { + id := util.DerefInt64(req.Moment.Id) + err := redis.GetRedisClient().RPush(consts.RedisMomentPrefix+"recent_list", id) + if err != nil { + logger.Error("Push newly-created moment to list failed : %v", err) + return + } + }() + }() + req.Moment.Mid = goproto.Int64(req.BaseRequest.Mid) var accountpunishment *dbstruct.AccountPunishment if ec, accountpunishment = s.ApiCreateMomentBusinessValidate(ctx, req); ec != errcode.ErrCodeMomentSrvOk { @@ -2010,6 +1999,15 @@ func (s *Service) ApiGetMomentListByIds(ctx *gin.Context, req *momentproto.ApiLi return } + // 获取访问者的关注列表 + visitorMid := req.GetBaseRequest().Mid + followMap, err := s.utilGetFollowMap(ctx, visitorMid) + if err != nil { + logger.Error("utilGetFollowMap fail") + ec = errcode.ErrCodeAccountRelationSrvFail + return + } + // 填充主播信息 vos, ec := s.utilFillMomentsStreamerInfo(ctx, list, consts.InterfaceType_Api) if ec != errcode.ErrCodeMomentSrvOk { @@ -2019,8 +2017,16 @@ func (s *Service) ApiGetMomentListByIds(ctx *gin.Context, req *momentproto.ApiLi } voList = make([]*momentproto.ApiMomentVO, 0) for _, vo := range vos { - opVO, _ := vo.(*momentproto.ApiMomentVO) - voList = append(voList, opVO) + apiVO, _ := vo.(*momentproto.ApiMomentVO) + // 填充是否关注 + s.utilFillIsFollowedFillable(ctx, followMap, apiVO) + // 填充是否点赞 + if err := s.utilFillIsThumbedUpFillable(ctx, visitorMid, apiVO); err != nil { + logger.Error("utilFillIsThumbedUpFillable fail") + ec = errcode.ErrCodeThumbsUpSrvFail + return + } + voList = append(voList, apiVO) } return } @@ -2053,6 +2059,44 @@ func (s *Service) ApiThumbsUpMoment(ctx *gin.Context, req *momentproto.ApiThumbs return } +// 推荐 +func (s *Service) ApiGetMomentRecommList(ctx *gin.Context, req *momentproto.ApiRecommListReq) (recommMomentList []*momentproto.ApiMomentVO, ec errcode.ErrCode) { + + ec = errcode.ErrCodeMomentSrvOk + + // 1.查找本时段有无新发动态 + ids, err := s.utilGetMomentRecentListIds(ctx, req.Mid, req.OpType, consts.MomentRecommThroughput) + if err != nil { + logger.Error("utilGetMomentRecentListIds fail, err: %v", err) + ec = errcode.ErrCodeMomentSrvFail + return + } + + // 2.删去已查询的新动态 + recommThroughput := int64(consts.MomentRecommThroughput - len(ids)) + + // 3.补齐后续动态 + if recommThroughput > 0 { + recommIds, err := s.utilGetMomentRecommListIds(ctx, req.Mid, req.OpType, recommThroughput) + if err != nil { + logger.Error("utilGetMomentRecommListIds fail, err: %v", err) + ec = errcode.ErrCodeMomentSrvFail + return + } + ids = append(ids, recommIds...) + } + + // 4.查询动态内容 + recommMomentList, err = s.utilGetApiMomentVOListByIds(ctx, req.Mid, ids) + if err != nil { + logger.Error("utilGetMomentRecentListIds fail, err: %v", err) + ec = errcode.ErrCodeMomentSrvFail + return + } + + return +} + func (s *Service) ApiGetThumbsUpList(ctx *gin.Context, req *thumbsupproto.ApiListReq) (list []*dbstruct.ThumbsUp, ec errcode.ErrCode) { ec = errcode.ErrCodeThumbsUpSrvOk diff --git a/app/mix/service/cronservice.go b/app/mix/service/cronservice.go index eec01d8d..01bbd362 100644 --- a/app/mix/service/cronservice.go +++ b/app/mix/service/cronservice.go @@ -56,6 +56,7 @@ func (s *CronService) Init(c any) (err error) { //exec.RegTask("clear_content_audit_batch_execution_logs", s.ClearContentAuditBatchExecutionLogs) exec.RegTask("send_contact_customer_services_of_last_minute", s.SendContactCustomerServicesOfLastMinute) exec.RegTask("cancel_account_at_due_time", s.CancelAccountsAtDueTime) + exec.RegTask("reload_moment_recomm_list", s.ReloadMomentRecommList) exec.LogHandler(customLogHandle) //注册任务handler diff --git a/app/mix/service/logic/user_moment_visit_offset.go b/app/mix/service/logic/user_moment_visit_offset.go new file mode 100644 index 00000000..f75a961c --- /dev/null +++ b/app/mix/service/logic/user_moment_visit_offset.go @@ -0,0 +1,49 @@ +package logic + +import ( + "service/app/mix/dao" + "service/dbstruct" + "service/library/logger" + + "github.com/gin-gonic/gin" +) + +type UserMomentVisitOffset struct { + store *dao.Store +} + +func NewUserMomentVisitOffset(store *dao.Store) (a *UserMomentVisitOffset) { + a = &UserMomentVisitOffset{ + store: store, + } + return +} + +func (p *UserMomentVisitOffset) OpCreate(ctx *gin.Context, uservisitoffset *dbstruct.UserMomentVisitOffset) error { + err := p.store.CreateUserMomentVisitOffset(ctx, uservisitoffset) + if err != nil { + logger.Error("CreateUserMomentVisitOffset fail, err: %v", err) + return err + } + return nil +} + +func (p *UserMomentVisitOffset) OpUpdate(ctx *gin.Context, uservisitoffset *dbstruct.UserMomentVisitOffset) error { + err := p.store.UpdateUserMomentVisitOffset(ctx, uservisitoffset) + if err != nil { + logger.Error("UpdateUserMomentVisitOffset fail, err: %v", err) + return err + } + return nil +} + +func (p *UserMomentVisitOffset) OpGetUserMomentVisitOffset(ctx *gin.Context, mid int64) (*dbstruct.UserMomentVisitOffset, error) { + + uservisitoffset, err := p.store.GetUserMomentVisitOffset(ctx, mid) + if err != nil { + logger.Error("GetUserMomentVisitOffset fail, err: %v", err) + return nil, err + } + + return uservisitoffset, nil +} diff --git a/app/mix/service/logic/user_visit_offset.go b/app/mix/service/logic/user_visit_offset.go new file mode 100644 index 00000000..0c83578c --- /dev/null +++ b/app/mix/service/logic/user_visit_offset.go @@ -0,0 +1,49 @@ +package logic + +import ( + "service/app/mix/dao" + "service/dbstruct" + "service/library/logger" + + "github.com/gin-gonic/gin" +) + +type UserVisitOffset struct { + store *dao.Store +} + +func NewUserVisitOffset(store *dao.Store) (a *UserVisitOffset) { + a = &UserVisitOffset{ + store: store, + } + return +} + +func (p *UserVisitOffset) OpCreate(ctx *gin.Context, uservisitoffset *dbstruct.UserVisitOffset) error { + err := p.store.CreateUserVisitOffset(ctx, uservisitoffset) + if err != nil { + logger.Error("CreateUserVisitOffset fail, err: %v", err) + return err + } + return nil +} + +func (p *UserVisitOffset) OpUpdate(ctx *gin.Context, uservisitoffset *dbstruct.UserVisitOffset) error { + err := p.store.UpdateUserVisitOffset(ctx, uservisitoffset) + if err != nil { + logger.Error("UpdateUserVisitOffset fail, err: %v", err) + return err + } + return nil +} + +func (p *UserVisitOffset) OpGetUserVisitOffset(ctx *gin.Context, mid int64) (*dbstruct.UserVisitOffset, error) { + + uservisitoffset, err := p.store.GetUserVisitOffset(ctx, mid) + if err != nil { + logger.Error("GetUserVisitOffset fail, err: %v", err) + return nil, err + } + + return uservisitoffset, nil +} diff --git a/app/mix/service/service.go b/app/mix/service/service.go index 49af494e..ee4d7c66 100644 --- a/app/mix/service/service.go +++ b/app/mix/service/service.go @@ -51,7 +51,6 @@ import ( "service/library/mycrypto" "service/library/payclients/alipaycli" "service/library/payclients/wxpaycli" - "service/library/redis" accountpunishmentproto "service/api/proto/accountpunishment/proto" @@ -105,6 +104,8 @@ var ( _DefaultMomentAuditTask *logic.MomentAuditTask _DefaultAccountPunishment *logic.AccountPunishment _DefaultAccountCancellation *logic.AccountCancellation + _DefaultUserVisitOffset *logic.UserVisitOffset + _DefaultUserMomentVisitOffset *logic.UserMomentVisitOffset ) type Service struct { @@ -183,6 +184,8 @@ func (s *Service) Init(c any) (err error) { _DefaultMomentAuditTask = logic.NewMomentAuditTask(store) _DefaultAccountPunishment = logic.NewAccountPunishment(store) _DefaultAccountCancellation = logic.NewAccountCancellation(store) + _DefaultUserVisitOffset = logic.NewUserVisitOffset(store) + _DefaultUserMomentVisitOffset = logic.NewUserMomentVisitOffset(store) return } @@ -1985,56 +1988,6 @@ func (s *Service) OpGetStreamerWxId(ctx *gin.Context, req *streamerproto.OpListS return } -// 推荐 -func (s *Service) OpGetStreamerRecommList(ctx *gin.Context, req *streamerproto.OpRecommListReq) (recommlist []int64, ec errcode.ErrCode) { - - ec = errcode.ErrCodeStreamerSrvOk - - //1.从redis中获取数据 - err := redis.GetRedisClient().GetObject(consts.RedisStreamerPrefix+"recomm_list", &recommlist) - if err != nil { - logger.Error("Redis read failed : %v", err) - ec = errcode.ErrCodeStreamerRecommListRedisCacheInvalid - return - } - - //2.若redis命中失败,再从数据库查 - if len(recommlist) == 0 { - logger.Error("Redis hit failed, reading recommendation list from mongo...") - - // recommlist, _, err = _DefaultAccountRelation.OpIsFollowedCount(ctx) - // if err != nil { - // logger.Error("OpIsFollowedCount fail, err: %v", err) - // ec = errcode.ErrCodeAccountRelationSrvFail - // return - // } - - list, err := _DefaultStreamer.OpList(ctx, &streamerproto.OpListReq{ - Sort: "-fans", - }) - if err != nil { - logger.Error("OpList fail, err: %v", err) - ec = errcode.ErrCodeAccountRelationSrvFail - return - } - recommlist = make([]int64, len(list)) - for i, streamer := range list { - recommlist[i] = util.DerefInt64(streamer.Mid) - } - - //若数据库命中成功,则立即加载进redis - if len(recommlist) != 0 { - err := redis.GetRedisClient().Set(consts.RedisStreamerPrefix+"recomm_list", recommlist, 0) - if err != nil { - logger.Error("Redis cache fail, err: %v", err) - } - - } - } - - return -} - // Feedback func (s *Service) OpCreateFeedback(ctx *gin.Context, req *feedbackproto.OpCreateReq) (ec errcode.ErrCode) { ec = errcode.ErrCodeFeedbackSrvOk diff --git a/app/mix/service/utilservice.go b/app/mix/service/utilservice.go index 6b9f89dc..14188534 100644 --- a/app/mix/service/utilservice.go +++ b/app/mix/service/utilservice.go @@ -23,6 +23,7 @@ import ( "service/dbstruct" "service/library/apollo" "service/library/logger" + "service/library/redis" "strings" "time" @@ -728,3 +729,575 @@ func (s *Service) utilCancelAccountByMids(ctx *gin.Context, midList []int64) err return nil } + +func (s *Service) utilGetStreamerRecommList(ctx *gin.Context) (recommlist []int64, err error) { + + // 1.从redis中获取数据 + err = redis.GetRedisClient().GetObject(consts.RedisStreamerPrefix+"recomm_list", &recommlist) + if err != nil { + logger.Error("Redis read failed : %v", err) + return + } + + // 2.若redis命中失败,再从数据库查 + if len(recommlist) == 0 { + logger.Error("Redis hit failed, reading recommendation list from mongo...") + + list, err := _DefaultStreamer.OpList(ctx, &streamerproto.OpListReq{ + Sort: []string{"-fans"}, + }) + if err != nil { + logger.Error("OpList fail, err: %v", err) + return nil, err + } + recommlist = make([]int64, len(list)) + for i, streamer := range list { + recommlist[i] = util.DerefInt64(streamer.Mid) + } + + //若数据库命中成功,则立即加载进redis + if len(recommlist) != 0 { + err := redis.GetRedisClient().Set(consts.RedisStreamerPrefix+"recomm_list", recommlist, 0) + if err != nil { + logger.Error("Redis cache fail, err: %v", err) + } + } + } + + return +} + +func (s *Service) utilGetInitUserVisitOffset(ctx *gin.Context, mid int64, recommlistLength int64) (err error) { + uservisitoffset, err := _DefaultUserVisitOffset.OpGetUserVisitOffset(ctx, mid) + if err != nil { + logger.Error("OpGetUserVisitOffset fail, err: %v", err) + return + } + + var execFunc func(*gin.Context, *dbstruct.UserVisitOffset) error + + if uservisitoffset == nil { + execFunc = _DefaultUserVisitOffset.OpCreate + } else { + execFunc = _DefaultUserVisitOffset.OpUpdate + } + + // 吞吐量大于等于推荐数组长度,则这次获取后已经触底 + if consts.StreamerRecommThroughput >= recommlistLength { + err = execFunc(ctx, &dbstruct.UserVisitOffset{ + Id: mid, + StreamerRecommOffset: 0, + BottomFlag: goproto.Int64(1), + Ver: 0, + }) + if err != nil { + logger.Error("_DefaultUserVisitOffset OpCreate fail, err: %v", err) + return + } + } else { + err = execFunc(ctx, &dbstruct.UserVisitOffset{ + Id: mid, + StreamerRecommOffset: consts.StreamerRecommThroughput, + BottomFlag: goproto.Int64(0), + Ver: 0, + }) + if err != nil { + logger.Error("_DefaultUserVisitOffset OpCreate fail, err: %v", err) + return + } + } + + return nil +} + +func (s *Service) utilGetUpUserVisitOffset(ctx *gin.Context, mid int64, recommlistLength int64) (uservisitoffset *dbstruct.UserVisitOffset, err error) { + uservisitoffset, err = _DefaultUserVisitOffset.OpGetUserVisitOffset(ctx, mid) + if err != nil { + logger.Error("OpGetUserVisitOffset fail, err: %v", err) + return + } + if uservisitoffset == nil { + // 默认用户刚进推荐页面的第一次操作一定是冷启动操作,游标也由冷启动创建 + return nil, qmgo.ErrNoSuchDocuments + } else { + nowoffset := (uservisitoffset.StreamerRecommOffset + consts.StreamerRecommThroughput) % recommlistLength + // 向上操作固定清掉触底标志 + var bottomFlagPtr *int64 = nil + if uservisitoffset.GetBottomFlag() == 1 { + bottomFlagPtr = goproto.Int64(0) + } + err = _DefaultUserVisitOffset.OpUpdate(ctx, &dbstruct.UserVisitOffset{ + Id: uservisitoffset.Id, + StreamerRecommOffset: nowoffset, + BottomFlag: bottomFlagPtr, + Ver: uservisitoffset.Ver, + }) + if err != nil { + logger.Error("_DefaultUserVisitOffset OpUpdate fail, err: %v", err) + return + } + } + return +} + +func (s *Service) utilGetDownUserVisitOffset(ctx *gin.Context, mid int64, recommlistLength int64) (uservisitoffset *dbstruct.UserVisitOffset, err error) { + uservisitoffset, err = _DefaultUserVisitOffset.OpGetUserVisitOffset(ctx, mid) + if err != nil { + logger.Error("OpGetUserVisitOffset fail, err: %v", err) + return + } + + if uservisitoffset == nil { + // 默认用户刚进推荐页面的第一次操作一定是冷启动操作,游标也由冷启动创建 + return nil, qmgo.ErrNoSuchDocuments + } else { + if uservisitoffset.GetBottomFlag() == 1 { + return + } + offset := uservisitoffset.StreamerRecommOffset + if offset+consts.StreamerRecommThroughput >= recommlistLength { + err = _DefaultUserVisitOffset.OpUpdate(ctx, &dbstruct.UserVisitOffset{ + Id: uservisitoffset.Id, + StreamerRecommOffset: 0, + BottomFlag: goproto.Int64(1), + Ver: uservisitoffset.Ver, + }) + if err != nil { + logger.Error("_DefaultUserVisitOffset OpUpdate fail, err: %v", err) + return + } + } else { + err = _DefaultUserVisitOffset.OpUpdate(ctx, &dbstruct.UserVisitOffset{ + Id: uservisitoffset.Id, + StreamerRecommOffset: offset + consts.StreamerRecommThroughput, + Ver: uservisitoffset.Ver, + }) + if err != nil { + logger.Error("_DefaultUserVisitOffset OpUpdate fail, err: %v", err) + return + } + } + } + return +} + +// 获取该用户当前在主播推荐数组中的游标和是否已经触底标志 +func (s *Service) utilGetStreamerRecommlistOffsetAndBottomeFlag(ctx *gin.Context, recommListLength int64, mid int64, opType int64) (offset int64, bottomFlag int64, err error) { + offset = int64(0) + var uservisitoffset *dbstruct.UserVisitOffset + + // 初始化操作 + switch opType { + case consts.Recomm_Init: + err = s.utilGetInitUserVisitOffset(ctx, mid, int64(recommListLength)) + if err != nil { + logger.Error("utilGetInitUserVisitOffset fail, err: %v", err) + return + } + // 向上滚动操作 + case consts.Recomm_Up: + // 若吞吐量比推荐数组长度小,则正常操作,否则认为游标永远是0 + if consts.StreamerRecommThroughput < recommListLength { + uservisitoffset, err = s.utilGetUpUserVisitOffset(ctx, mid, int64(recommListLength)) + if err != nil { + logger.Error("utilGetUpUserVisitOffset fail, err: %v", err) + return + } + offset = uservisitoffset.StreamerRecommOffset + } + // 向下滚动操作 + case consts.Recomm_Down: + // 若吞吐量比推荐数组长度小,则正常操作,否则认为直接触底 + if consts.StreamerRecommThroughput < recommListLength { + uservisitoffset, err = s.utilGetDownUserVisitOffset(ctx, mid, int64(recommListLength)) + if err != nil { + logger.Error("utilGetUpUserVisitOffset fail, err: %v", err) + return + } + if uservisitoffset.GetBottomFlag() == 1 { + return 0, 1, nil + } + offset = uservisitoffset.StreamerRecommOffset + } else { + return 0, 1, nil + } + } + return +} + +func (s *Service) utilGetStreamerRecommListVO(ctx *gin.Context, recommlist []int64, mid int64, opType int64) (recommStreamerList []*streamerproto.ApiListExtVO, err error) { + // 获取用户游标 + offset := int64(0) + recommListLength := int64(len(recommlist)) + + offset, bottomFlag, err := s.utilGetStreamerRecommlistOffsetAndBottomeFlag(ctx, recommListLength, mid, opType) + if err != nil { + logger.Error("utilGetStreamerRecommlistOffsetAndBottomeFlag fail, err: %v", err) + return + } + if bottomFlag == 1 { // 已触底 + return make([]*streamerproto.ApiListExtVO, 0), nil + } + + // 向下操作,去尾 + upperBound := int64(consts.StreamerRecommThroughput) + if opType == consts.Recomm_Down { + surplusVolume := recommListLength - offset + if surplusVolume < consts.StreamerRecommThroughput { + upperBound = surplusVolume + } + } + + // 根据用户游标查询得到结果 + midList := make([]int64, 0) + for i := int64(0); i < upperBound; i++ { + index := (offset + int64(i)) % int64(recommListLength) + midList = append(midList, recommlist[index]) + } + + accountList, err := _DefaultAccount.OpListByMids(ctx, &accountproto.OpListByMidsReq{ + Mids: midList, + Sort: []string{"_id"}, + }) + if err != nil { + logger.Error("_DefaultAccount OpListByMids fail, err: %v", err) + return + } + + // 构建一个accountList -> midList 索引的map + _accountIndexMap := make(map[int]int) + for i, account := range accountList { + mid1 := util.DerefInt64(account.Mid) + for j, mid2 := range midList { + if mid1 == mid2 { + _accountIndexMap[i] = j + } + } + } + + streamerExts := make([]streamerproto.StreamerExtVO, len(accountList)) + for i := range streamerExts { + streamerExts[i] = &streamerproto.ApiListExtVO{} + } + ec := s.utilExtendAccountsIntoStreamerExts(ctx, accountList, streamerExts) + if ec != errcode.ErrCodeStreamerSrvOk { + err = fmt.Errorf("extend accountlist into streamer_exts fail") + logger.Error("Extend accountlist into streamer_exts fail") + return + } + recommStreamerList = make([]*streamerproto.ApiListExtVO, len(streamerExts)) + for i, streamerExt := range streamerExts { + vo, _ := streamerExt.(*streamerproto.ApiListExtVO) + // 从下标位置映射回去 + recommStreamerList[_accountIndexMap[i]] = vo + } + + return +} + +func (s *Service) utilGetInitUserMomentVisitOffset(ctx *gin.Context, mid int64, recommlistLength int64, throughput int64) (err error) { + uservisitoffset, err := _DefaultUserMomentVisitOffset.OpGetUserMomentVisitOffset(ctx, mid) + if err != nil { + logger.Error("OpGetUserMomentVisitOffset fail, err: %v", err) + return + } + + var execFunc func(*gin.Context, *dbstruct.UserMomentVisitOffset) error + + if uservisitoffset == nil { + execFunc = _DefaultUserMomentVisitOffset.OpCreate + } else { + execFunc = _DefaultUserMomentVisitOffset.OpUpdate + } + + // 吞吐量大于等于推荐数组长度,则这次获取后已经触底 + if throughput >= recommlistLength { + err = execFunc(ctx, &dbstruct.UserMomentVisitOffset{ + Id: mid, + MomentRecommOffset: 0, + BottomFlag: goproto.Int64(1), + Ver: 0, + }) + if err != nil { + logger.Error("_DefaultUserMomentVisitOffset OpCreate fail, err: %v", err) + return + } + } else { + err = execFunc(ctx, &dbstruct.UserMomentVisitOffset{ + Id: mid, + MomentRecommOffset: consts.MomentRecommThroughput, + BottomFlag: goproto.Int64(0), + Ver: 0, + }) + if err != nil { + logger.Error("_DefaultUserMomentVisitOffset OpCreate fail, err: %v", err) + return + } + } + + return nil +} + +func (s *Service) utilGetUpUserMomentVisitOffset(ctx *gin.Context, mid int64, recommlistLength int64, throughput int64) (uservisitoffset *dbstruct.UserMomentVisitOffset, err error) { + uservisitoffset, err = _DefaultUserMomentVisitOffset.OpGetUserMomentVisitOffset(ctx, mid) + if err != nil { + logger.Error("OpGetUserMomentVisitOffset fail, err: %v", err) + return + } + if uservisitoffset == nil { + // 默认用户刚进推荐页面的第一次操作一定是冷启动操作,游标也由冷启动创建 + return nil, qmgo.ErrNoSuchDocuments + } else { + nowoffset := (uservisitoffset.MomentRecommOffset + throughput) % recommlistLength + // 向上操作固定清掉触底标志 + var bottomFlagPtr *int64 = nil + if uservisitoffset.GetBottomFlag() == 1 { + bottomFlagPtr = goproto.Int64(0) + } + err = _DefaultUserMomentVisitOffset.OpUpdate(ctx, &dbstruct.UserMomentVisitOffset{ + Id: uservisitoffset.Id, + MomentRecommOffset: nowoffset, + BottomFlag: bottomFlagPtr, + Ver: uservisitoffset.Ver, + }) + if err != nil { + logger.Error("_DefaultUserMomentVisitOffset OpUpdate fail, err: %v", err) + return + } + } + return +} + +func (s *Service) utilGetDownUserMomentVisitOffset(ctx *gin.Context, mid int64, recommlistLength int64, throughput int64) (uservisitoffset *dbstruct.UserMomentVisitOffset, err error) { + uservisitoffset, err = _DefaultUserMomentVisitOffset.OpGetUserMomentVisitOffset(ctx, mid) + if err != nil { + logger.Error("OpGetUserMomentVisitOffset fail, err: %v", err) + return + } + + if uservisitoffset == nil { + // 默认用户刚进推荐页面的第一次操作一定是冷启动操作,游标也由冷启动创建 + return nil, qmgo.ErrNoSuchDocuments + } else { + if uservisitoffset.GetBottomFlag() == 1 { + return + } + offset := uservisitoffset.MomentRecommOffset + if offset+throughput >= recommlistLength { + err = _DefaultUserMomentVisitOffset.OpUpdate(ctx, &dbstruct.UserMomentVisitOffset{ + Id: uservisitoffset.Id, + MomentRecommOffset: 0, + BottomFlag: goproto.Int64(1), + Ver: uservisitoffset.Ver, + }) + if err != nil { + logger.Error("_DefaultUserMomentVisitOffset OpUpdate fail, err: %v", err) + return + } + } else { + err = _DefaultUserMomentVisitOffset.OpUpdate(ctx, &dbstruct.UserMomentVisitOffset{ + Id: uservisitoffset.Id, + MomentRecommOffset: offset + throughput, + Ver: uservisitoffset.Ver, + }) + if err != nil { + logger.Error("_DefaultUserMomentVisitOffset OpUpdate fail, err: %v", err) + return + } + } + } + return +} + +// 获取该用户当前在主播推荐数组中的游标和是否已经触底标志 +func (s *Service) utilGetMomentRecommlistOffsetAndBottomeFlag(ctx *gin.Context, mid int64, opType int64, recommListLength int64, throughput int64) (offset int64, isAtBottom int64, err error) { + offset = int64(0) + isRecommListSuffi := throughput <= recommListLength + var uservisitoffset *dbstruct.UserMomentVisitOffset + + // 初始化操作 + switch opType { + case consts.Recomm_Init: + err = s.utilGetInitUserMomentVisitOffset(ctx, mid, recommListLength, throughput) + if err != nil { + logger.Error("utilGetInitUserMomentVisitOffset fail, err: %v", err) + return + } + // 向上滚动操作 + case consts.Recomm_Up: + // 若吞吐量比推荐数组长度小,则正常操作,否则认为直接触顶(游标永远是0) + if !isRecommListSuffi { + return + } + uservisitoffset, err = s.utilGetUpUserMomentVisitOffset(ctx, mid, recommListLength, throughput) + if err != nil { + logger.Error("utilGetUpUserMomentVisitOffset fail, err: %v", err) + return + } + offset = uservisitoffset.MomentRecommOffset + // 向下滚动操作 + case consts.Recomm_Down: + // 若吞吐量比推荐数组长度小,则正常操作,否则认为直接触底 + if !isRecommListSuffi { + return 0, 1, nil + } + uservisitoffset, err = s.utilGetDownUserMomentVisitOffset(ctx, mid, recommListLength, throughput) + if err != nil { + logger.Error("utilGetUpUserMomentVisitOffset fail, err: %v", err) + return + } + if uservisitoffset.GetBottomFlag() == 1 { + return 0, 1, nil + } + offset = uservisitoffset.MomentRecommOffset + } + return +} + +// 根据提供的用户mid和操作类型,从最近推荐列表中获取吞吐量大小的列表,并更新用户游标 +func (s *Service) utilGetMomentRecentListIds(ctx *gin.Context, mid int64, opType int64, throughput int64) (ids []int64, err error) { + + if opType != consts.Recomm_Up { + return + } + + // 从redis中获取动态列表长度 + recentListLength, err := redis.GetRedisClient().LLen(consts.RedisMomentPrefix + "recent_list") + if err != nil { + logger.Error("Redis read failed : %v", err) + return + } + if recentListLength == 0 { + return + } + + // 获取用户游标 + offset, err := redis.GetRedisClient().HGetInt64(consts.RedisMomentPrefix+"recent_list_offset", fmt.Sprint(mid)) + if err != nil && !redis.GetRedisClient().IsErrNil(err) { + logger.Error("Redis read failed : %v", err) + return + } + // 检验游标是否还可用 + ids = make([]int64, 0) + incr := 0 + for ; incr < int(throughput) && offset < recentListLength; incr++ { + id, err := redis.GetRedisClient().LIndexInt64(consts.RedisMomentPrefix+"recent_list", int(offset)) + if err != nil { + logger.Error("Redis read failed : %v", err) + return make([]int64, 0), err + } + ids = append(ids, id) + offset++ + } + _, err = redis.GetRedisClient().HIncrby(consts.RedisMomentPrefix+"recent_list_offset", fmt.Sprint(mid), incr) + if err != nil && !redis.GetRedisClient().IsErrNil(err) { + logger.Error("Redis cache failed : %v", err) + return + } + + return +} + +// 根据提供的用户mid和操作类型,从推荐列表中获取吞吐量大小的列表,并更新用户游标 +func (s *Service) utilGetMomentRecommListIds(ctx *gin.Context, mid int64, opType int64, throughput int64) (ids []int64, err error) { + // 从redis中获取动态列表长度 + recommListLength, err := redis.GetRedisClient().LLen(consts.RedisMomentPrefix + "recomm_list") + if err != nil { + logger.Error("Redis read failed : %v", err) + return + } + if recommListLength == 0 { + return + } + + // 获取用户游标 + offset := int64(0) + + // 获取游标和是否已触底标志 + offset, bottomFlag, err := s.utilGetMomentRecommlistOffsetAndBottomeFlag(ctx, mid, opType, recommListLength, throughput) + if err != nil { + logger.Error("utilGetMomentRecommlistOffsetAndBottomeFlag fail, err: %v", err) + return + } + if bottomFlag == 1 { // 已触底 + return make([]int64, 0), nil + } + + // 向下操作,去尾 + upperBound := throughput + if opType == consts.Recomm_Down { + surplusVolume := recommListLength - offset + if surplusVolume < throughput { + upperBound = surplusVolume + } + } + + // 根据用户游标得到待查询ids + ids = make([]int64, 0) + for i := int64(0); i < upperBound; i++ { + index := (offset + i) % recommListLength + id, err := redis.GetRedisClient().LIndexInt64(consts.RedisMomentPrefix+"recomm_list", int(index)) + if err != nil { + logger.Error("Redis read failed : %v", err) + return make([]int64, 0), err + } + ids = append(ids, id) + } + + return +} + +// 根据提供的ids,查询出动态,并填充主播、是否关注、是否点赞,并保证按照提供的顺序返回ApiMomentVO的list +func (s *Service) utilGetApiMomentVOListByIds(ctx *gin.Context, visitorMid int64, ids []int64) (volist []*momentproto.ApiMomentVO, err error) { + + if len(ids) == 0 { + return make([]*momentproto.ApiMomentVO, 0), nil + } + + list, err := _DefaultMoment.OpListByIds(ctx, &momentproto.OpListByIdsReq{ + Ids: ids, + }) + if err != nil { + logger.Error("_DefaultMoment OpListByIds fail, , err: %v", err) + return + } + + // list -> idList 索引的map,保证不打乱顺序 + _momentIndexMap := make(map[int]int) + for i, account := range list { + id1 := util.DerefInt64(account.Id) + for j, id2 := range ids { + if id1 == id2 { + _momentIndexMap[i] = j + } + } + } + + // 获取访问者的关注列表 + followMap, err := s.utilGetFollowMap(ctx, visitorMid) + if err != nil { + logger.Error("utilGetFollowMap fail") + return + } + + // 填充主播信息 + vos, ec := s.utilFillMomentsStreamerInfo(ctx, list, consts.InterfaceType_Api) + if ec != errcode.ErrCodeMomentSrvOk { + logger.Error("utilFillMomentsStreamerInfo fail, , err: %v", err) + return + } + volist = make([]*momentproto.ApiMomentVO, len(vos)) + for i, vo := range vos { + apiVO, _ := vo.(*momentproto.ApiMomentVO) + // 填充是否关注 + s.utilFillIsFollowedFillable(ctx, followMap, apiVO) + // 填充是否点赞 + if err = s.utilFillIsThumbedUpFillable(ctx, visitorMid, apiVO); err != nil { + logger.Error("utilFillIsThumbedUpFillable fail") + return + } + volist[_momentIndexMap[i]] = apiVO + } + + return +} diff --git a/app/mix/service/xxljob_tasks.go b/app/mix/service/xxljob_tasks.go index 92b477bb..3db429d3 100644 --- a/app/mix/service/xxljob_tasks.go +++ b/app/mix/service/xxljob_tasks.go @@ -4,11 +4,11 @@ import ( "context" "fmt" "service/api/consts" - "service/api/errcode" accountproto "service/api/proto/account/proto" account_cancellationproto "service/api/proto/account_cancellation/proto" contact_customer_service_proto "service/api/proto/contact_customer_service/proto" daily_statementproto "service/api/proto/daily_statement/proto" + momentproto "service/api/proto/moment/proto" streamerproto "service/api/proto/streamer/proto" vasproto "service/api/proto/vas/proto" vericodeproto "service/api/proto/vericode/proto" @@ -31,18 +31,19 @@ import ( func (s *CronService) ReloadRecommList(ctx context.Context, param *xxl.RunReq) (msg string) { logger.Info("task %v param: %v log_id: %v", param.ExecutorHandler, param.ExecutorParams, xxl.Int64ToStr(param.LogID)) logger.Info("Refreshing recommendation list cached in redis...") - list, ec := DefaultService.OpGetStreamerList(&gin.Context{}, &streamerproto.OpListReq{ - Sort: "-fans", + list, err := _DefaultStreamer.OpList(&gin.Context{}, &streamerproto.OpListReq{ + Sort: []string{"fans"}, }) - if ec != errcode.ErrCodeAccountRelationSrvOk { - logger.Error("OpGetStreamerList fail, ec: %v", ec) - return fmt.Sprintf("OpGetStreamerList fail, ec: %v", ec) + if err != nil { + logger.Error("OpList fail, err: %v", err) + return fmt.Sprintf("OpList fail, err: %v", err) } midList := make([]int64, len(list)) for i, streamer := range list { midList[i] = util.DerefInt64(streamer.Mid) } - err := redis.GetRedisClient().Set(consts.RedisStreamerPrefix+"recomm_list", midList, 0) + + err = redis.GetRedisClient().Set(consts.RedisStreamerPrefix+"recomm_list", midList, 0) if err != nil { logger.Error("Redis cache fail, err: %v", err) return fmt.Sprintf("Redis cache fail, err: %v", err) @@ -387,3 +388,38 @@ func (s *CronService) CancelAccountsAtDueTime(ctx context.Context, param *xxl.Ru return "Account cancellation execution finished..." } + +func (s *CronService) ReloadMomentRecommList(ctx context.Context, param *xxl.RunReq) (msg string) { + logger.Info("task %v param: %v log_id: %v", param.ExecutorHandler, param.ExecutorParams, xxl.Int64ToStr(param.LogID)) + logger.Info("Refreshing recommendation list cached in redis...") + list, err := _DefaultMoment.OpList(&gin.Context{}, &momentproto.OpListReq{ + CtLowerBound: goproto.Int64(0), + }) + if err != nil { + logger.Error("OpGetMomentList fail, err: %v", err) + return fmt.Sprintf("OpGetMomentList fail, err: %v", err) + } + + // 清缓存 + if err := redis.GetRedisClient().Del(consts.RedisMomentPrefix + "recomm_list"); err != nil { + logger.Error("Del redis cache fail, err: %v", err) + return fmt.Sprintf("Del redis cache fail, err: %v", err) + } + if err := redis.GetRedisClient().Del(consts.RedisMomentPrefix + "recent_list_offset"); err != nil { + logger.Error("Del redis cache fail, err: %v", err) + return fmt.Sprintf("Del redis cache fail, err: %v", err) + } + + // 加载缓存 + for _, moment := range list { + id := util.DerefInt64(moment.Id) + err := redis.GetRedisClient().RPush(consts.RedisMomentPrefix+"recomm_list", id) + if err != nil { + logger.Error("Redis cache fail, err: %v", err) + return fmt.Sprintf("Redis cache fail, err: %v", err) + } + } + + logger.Info("Refresh moment recommendation list cached in redis accomplished...") + return "Refresh moment recommendation list cached in redis accomplished" +} diff --git a/dbstruct/user_moment_visit_offset.go b/dbstruct/user_moment_visit_offset.go new file mode 100644 index 00000000..4879b733 --- /dev/null +++ b/dbstruct/user_moment_visit_offset.go @@ -0,0 +1,15 @@ +package dbstruct + +type UserMomentVisitOffset struct { + Id int64 `json:"id" bson:"_id"` // id,用户的mid + MomentRecommOffset int64 `json:"moment_recomm_offset" bson:"moment_recomm_offset"` // 动态推荐列表偏移量 + BottomFlag *int64 `json:"bottom_flag" bson:"bottom_flag"` // 是否触底标志 + Ver int64 `json:"ver" bson:"ver"` // 乐观锁 +} + +func (p *UserMomentVisitOffset) GetBottomFlag() int64 { + if p == nil || p.BottomFlag == nil { + return 0 + } + return *p.BottomFlag +} diff --git a/dbstruct/user_visit_offset.go b/dbstruct/user_visit_offset.go new file mode 100644 index 00000000..15acfd6d --- /dev/null +++ b/dbstruct/user_visit_offset.go @@ -0,0 +1,15 @@ +package dbstruct + +type UserVisitOffset struct { + Id int64 `json:"id" bson:"_id"` // id,用户的mid + StreamerRecommOffset int64 `json:"streamer_recomm_offset" bson:"streamer_recomm_offset"` // 主播推荐列表偏移量 + BottomFlag *int64 `json:"bottom_flag" bson:"bottom_flag"` // 是否触底标志 + Ver int64 `json:"ver" bson:"ver"` // 乐观锁 +} + +func (p *UserVisitOffset) GetBottomFlag() int64 { + if p == nil || p.BottomFlag == nil { + return 0 + } + return *p.BottomFlag +} diff --git a/library/redis/redis.go b/library/redis/redis.go index 7a481854..b80efdb2 100644 --- a/library/redis/redis.go +++ b/library/redis/redis.go @@ -300,6 +300,12 @@ func (c *Client) HGetAll(key string, val interface{}) error { return err } +// HINCRBY 将保存的整型键值增加 +func (c *Client) HIncrby(key string, field string, val int) (reply interface{}, err error) { + reply, err = c.Do("HINCRBY", c.getKey(key), field, val) + return +} + /** Redis列表是简单的字符串列表,按照插入顺序排序。你可以添加一个元素到列表的头部(左边)或者尾部(右边) **/ @@ -476,7 +482,7 @@ func (c *Client) LREM(key string, count int, member interface{}) (int, error) { // LLen 获取列表的长度 func (c *Client) LLen(key string) (int64, error) { - return Int64(c.Do("RPOP", c.getKey(key))) + return Int64(c.Do("LLEN", c.getKey(key))) } // LRange 返回列表 key 中指定区间内的元素,区间以偏移量 start 和 stop 指定。 @@ -487,6 +493,11 @@ func (c *Client) LRange(key string, start, end int) (interface{}, error) { return c.Do("LRANGE", c.getKey(key), start, end) } +// LIndex 返回列表 key 中指定索引的元素。 +func (c *Client) LIndexInt64(key string, index int) (int64, error) { + return Int64(c.Do("LINDEX", c.getKey(key), index)) +} + /** Redis 有序集合和集合一样也是string类型元素的集合,且不允许重复的成员。 不同的是每个元素都会关联一个double类型的分数。redis正是通过分数来为集合中的成员进行从小到大的排序。 @@ -813,3 +824,8 @@ func (c *Client) closePool() { // func init() { // cache.Register("redis", &Client{}) // } + +// 是否是空错误 +func (c *Client) IsErrNil(err error) bool { + return err == redis.ErrNil +}