diff --git a/api/consts/consts.go b/api/consts/consts.go index c98b5835..a1d3b2b2 100644 --- a/api/consts/consts.go +++ b/api/consts/consts.go @@ -74,6 +74,7 @@ const ( // redis键前缀 const ( RedisStreamerPrefix = "streamer:" //streamer服务前缀 + RedisMomentPrefix = "moment:" //moment服务前缀 ) //const PackageRootPath = "C:/Users/PC/Desktop/service" diff --git a/api/proto/moment/proto/moment_api.go b/api/proto/moment/proto/moment_api.go index 6617f8c9..79fff392 100644 --- a/api/proto/moment/proto/moment_api.go +++ b/api/proto/moment/proto/moment_api.go @@ -140,3 +140,18 @@ type ApiListByIdsResp struct { base.BaseResponse Data *ApiListByIdsData `json:"data"` } + +// api 推荐 +type ApiRecommListReq struct { + base.BaseRequest + OpType int64 `json:"op_type"` +} + +type ApiRecommListData struct { + RecommList []*ApiMomentVO `json:"recomm_list"` +} + +type ApiRecommListResp struct { + base.BaseResponse + Data *ApiRecommListData `json:"data"` +} diff --git a/app/mix/controller/init.go b/app/mix/controller/init.go index d6633c5d..623519a6 100644 --- a/app/mix/controller/init.go +++ b/app/mix/controller/init.go @@ -103,6 +103,7 @@ func Init(r *gin.Engine) { apiMomentGroup.POST("list_by_mids", middleware.JSONParamValidator(momentproto.ApiListByMidsReq{}), middleware.JwtAuthenticator(), ApiGetMomentListByMids) apiMomentGroup.POST("thumbs_up", middleware.JSONParamValidator(momentproto.ApiThumbsUpReq{}), middleware.JwtAuthenticator(), ApiThumbsUpMoment) apiMomentGroup.POST("list_by_ids", middleware.JSONParamValidator(momentproto.ApiListByIdsReq{}), middleware.JwtAuthenticator(), ApiGetMomentListByIds) + apiMomentGroup.POST("recomm_list", middleware.JSONParamValidator(momentproto.ApiRecommListReq{}), middleware.JwtAuthenticator(), ApiGetMomentRecommList) // 足迹 // apiFootPrintGroup := r.Group("/api/footprint", PrepareToC()) diff --git a/app/mix/controller/moment_api.go b/app/mix/controller/moment_api.go index 00314360..53a8c5a0 100644 --- a/app/mix/controller/moment_api.go +++ b/app/mix/controller/moment_api.go @@ -182,3 +182,25 @@ func ApiThumbsUpMoment(ctx *gin.Context) { ReplyOk(ctx, nil) } + +func ApiGetMomentRecommList(ctx *gin.Context) { + req := ctx.MustGet("client_req").(*momentproto.ApiRecommListReq) + + list, ec := service.DefaultService.ApiGetMomentRecommList(ctx, req) + if ec != errcode.ErrCodeMomentSrvOk { + logger.Error("OpGetMomentRecommList fail, req: %v, ec: %v", util.ToJson(req), ec) + ReplyErrCodeMsg(ctx, ec) + return + } + + mediaFillableList := make([]mediafiller.MediaFillable, len(list)) + for i, media := range list { + mediaFillableList[i] = media.MediaComp + } + mediafiller.FillList(ctx, mediaFillableList) + + data := &momentproto.ApiRecommListData{ + RecommList: list, + } + ReplyOk(ctx, data) +} diff --git a/app/mix/dao/mongo.go b/app/mix/dao/mongo.go index 72b44af9..d50b2424 100644 --- a/app/mix/dao/mongo.go +++ b/app/mix/dao/mongo.go @@ -157,6 +157,9 @@ const ( DBUserVisitOffset = "user_visit_offset" COLUserVisitOffset = "user_visit_offset" + + DBUserMomentVisitOffset = "user_moment_visit_offset" + COLUserMomentVisitOffset = "user_moment_visit_offset" ) // 商品表 @@ -368,6 +371,11 @@ func (m *Mongo) getColUserVisitOffset() *qmgo.Collection { return m.clientMix.Database(DBUserVisitOffset).Collection(COLUserVisitOffset) } +// 用户动态访问偏移量表 +func (m *Mongo) getColUserMomentVisitOffset() *qmgo.Collection { + return m.clientMix.Database(DBUserMomentVisitOffset).Collection(COLUserMomentVisitOffset) +} + // 商品相关 func (m *Mongo) CreateProduct(ctx *gin.Context, product *dbstruct.Product) error { col := m.getColProduct() @@ -3344,3 +3352,42 @@ func (m *Mongo) GetUserVisitOffset(ctx *gin.Context, mid int64) (*dbstruct.UserV } return uservisitoffset, err } + +func (m *Mongo) CreateUserMomentVisitOffset(ctx *gin.Context, usermomentvisitoffset *dbstruct.UserMomentVisitOffset) error { + col := m.getColUserMomentVisitOffset() + _, err := col.InsertOne(ctx, usermomentvisitoffset) + return err +} + +func (m *Mongo) UpdateUserMomentVisitOffset(ctx *gin.Context, usermomentvisitoffset *dbstruct.UserMomentVisitOffset) error { + col := m.getColUserMomentVisitOffset() + filter := qmgo.M{ + "_id": usermomentvisitoffset.Id, + "ver": usermomentvisitoffset.Ver, + } + set := qmgo.M{ + "moment_recomm_offset": usermomentvisitoffset.MomentRecommOffset, + } + if usermomentvisitoffset.BottomFlag != nil { + set["bottom_flag"] = usermomentvisitoffset.GetBottomFlag() + } + update := qmgo.M{ + "$set": set, + } + err := col.UpdateOne(ctx, filter, update) + return err +} + +func (m *Mongo) GetUserMomentVisitOffset(ctx *gin.Context, mid int64) (*dbstruct.UserMomentVisitOffset, error) { + usermomentvisitoffset := &dbstruct.UserMomentVisitOffset{} + col := m.getColUserMomentVisitOffset() + query := qmgo.M{ + "_id": mid, + } + err := col.Find(ctx, query).One(usermomentvisitoffset) + if err == qmgo.ErrNoSuchDocuments { + err = nil + return nil, err + } + return usermomentvisitoffset, err +} diff --git a/app/mix/service/apiservice.go b/app/mix/service/apiservice.go index c985b32e..ae7fd7b3 100644 --- a/app/mix/service/apiservice.go +++ b/app/mix/service/apiservice.go @@ -1882,6 +1882,29 @@ func (s *Service) ApiThumbsUpMoment(ctx *gin.Context, req *momentproto.ApiThumbs return } +// 推荐 +func (s *Service) ApiGetMomentRecommList(ctx *gin.Context, req *momentproto.ApiRecommListReq) (recommMomentList []*momentproto.ApiMomentVO, ec errcode.ErrCode) { + + ec = errcode.ErrCodeMomentSrvOk + + // 从redis中获取主播列表 + recommlist, err := s.utilGetMomentRecommList(ctx) + if err != nil { + logger.Error("utilGetMomentRecommList fail, err: %v", err) + ec = errcode.ErrCodeMomentSrvFail + return + } + + recommMomentList, err = s.utilGetMomentRecommListVO(ctx, recommlist, req.Mid, req.OpType) + if err != nil { + logger.Error("utilGetUpMomentRecommList fail, err: %v", err) + ec = errcode.ErrCodeMomentSrvFail + return + } + + return +} + func (s *Service) ApiGetThumbsUpList(ctx *gin.Context, req *thumbsupproto.ApiListReq) (list []*dbstruct.ThumbsUp, ec errcode.ErrCode) { ec = errcode.ErrCodeThumbsUpSrvOk diff --git a/app/mix/service/cronservice.go b/app/mix/service/cronservice.go index fcc24b6b..f810646b 100644 --- a/app/mix/service/cronservice.go +++ b/app/mix/service/cronservice.go @@ -54,6 +54,7 @@ func (s *CronService) Init(c any) (err error) { exec.RegTask("text_audit_batch_his", s.TextAuditBatchHis) //exec.RegTask("clear_content_audit_batch_execution_logs", s.ClearContentAuditBatchExecutionLogs) exec.RegTask("send_contact_customer_services_of_last_minute", s.SendContactCustomerServicesOfLastMinute) + exec.RegTask("reload_moment_recomm_list", s.ReloadMomentRecommList) exec.LogHandler(customLogHandle) //注册任务handler diff --git a/app/mix/service/logic/user_moment_visit_offset.go b/app/mix/service/logic/user_moment_visit_offset.go new file mode 100644 index 00000000..f75a961c --- /dev/null +++ b/app/mix/service/logic/user_moment_visit_offset.go @@ -0,0 +1,49 @@ +package logic + +import ( + "service/app/mix/dao" + "service/dbstruct" + "service/library/logger" + + "github.com/gin-gonic/gin" +) + +type UserMomentVisitOffset struct { + store *dao.Store +} + +func NewUserMomentVisitOffset(store *dao.Store) (a *UserMomentVisitOffset) { + a = &UserMomentVisitOffset{ + store: store, + } + return +} + +func (p *UserMomentVisitOffset) OpCreate(ctx *gin.Context, uservisitoffset *dbstruct.UserMomentVisitOffset) error { + err := p.store.CreateUserMomentVisitOffset(ctx, uservisitoffset) + if err != nil { + logger.Error("CreateUserMomentVisitOffset fail, err: %v", err) + return err + } + return nil +} + +func (p *UserMomentVisitOffset) OpUpdate(ctx *gin.Context, uservisitoffset *dbstruct.UserMomentVisitOffset) error { + err := p.store.UpdateUserMomentVisitOffset(ctx, uservisitoffset) + if err != nil { + logger.Error("UpdateUserMomentVisitOffset fail, err: %v", err) + return err + } + return nil +} + +func (p *UserMomentVisitOffset) OpGetUserMomentVisitOffset(ctx *gin.Context, mid int64) (*dbstruct.UserMomentVisitOffset, error) { + + uservisitoffset, err := p.store.GetUserMomentVisitOffset(ctx, mid) + if err != nil { + logger.Error("GetUserMomentVisitOffset fail, err: %v", err) + return nil, err + } + + return uservisitoffset, nil +} diff --git a/app/mix/service/service.go b/app/mix/service/service.go index 4d9510df..29eadb98 100644 --- a/app/mix/service/service.go +++ b/app/mix/service/service.go @@ -101,6 +101,7 @@ var ( _DefaultAppConfig *logic.AppConfig _DefaultMomentAuditTask *logic.MomentAuditTask _DefaultUserVisitOffset *logic.UserVisitOffset + _DefaultUserMomentVisitOffset *logic.UserMomentVisitOffset ) type Service struct { @@ -178,6 +179,7 @@ func (s *Service) Init(c any) (err error) { _DefaultAppConfig = logic.NewAppConfig(store) _DefaultMomentAuditTask = logic.NewMomentAuditTask(store) _DefaultUserVisitOffset = logic.NewUserVisitOffset(store) + _DefaultUserMomentVisitOffset = logic.NewUserMomentVisitOffset(store) return } diff --git a/app/mix/service/utilservice.go b/app/mix/service/utilservice.go index ffbbfda6..950a77f2 100644 --- a/app/mix/service/utilservice.go +++ b/app/mix/service/utilservice.go @@ -889,3 +889,261 @@ func (s *Service) utilGetStreamerRecommListVO(ctx *gin.Context, recommlist []int return } + +func (s *Service) utilGetMomentRecommList(ctx *gin.Context) (recommlist []int64, err error) { + + // 1.从redis中获取数据 + err = redis.GetRedisClient().GetObject(consts.RedisMomentPrefix+"recomm_list", &recommlist) + if err != nil { + logger.Error("Redis read failed : %v", err) + return + } + + // 2.若redis命中失败,再从数据库查 + if len(recommlist) == 0 { + logger.Error("Redis hit failed, reading recommendation list from mongo...") + + list, err := _DefaultMoment.OpList(ctx, &momentproto.OpListReq{ + CtLowerBound: goproto.Int64(0), + }) + if err != nil { + logger.Error("OpList fail, err: %v", err) + return nil, err + } + recommlist = make([]int64, len(list)) + for i, moment := range list { + recommlist[i] = util.DerefInt64(moment.Id) + } + + //若数据库命中成功,则立即加载进redis + if len(recommlist) != 0 { + err := redis.GetRedisClient().Set(consts.RedisMomentPrefix+"recomm_list", recommlist, 0) + if err != nil { + logger.Error("Redis cache fail, err: %v", err) + } + } + } + + return +} + +func (s *Service) utilGetInitUserMomentVisitOffset(ctx *gin.Context, mid int64, recommlistLength int64) (err error) { + uservisitoffset, err := _DefaultUserMomentVisitOffset.OpGetUserMomentVisitOffset(ctx, mid) + if err != nil { + logger.Error("OpGetUserMomentVisitOffset fail, err: %v", err) + return + } + + var execFunc func(*gin.Context, *dbstruct.UserMomentVisitOffset) error + + if uservisitoffset == nil { + execFunc = _DefaultUserMomentVisitOffset.OpCreate + } else { + execFunc = _DefaultUserMomentVisitOffset.OpUpdate + } + + // 吞吐量大于等于推荐数组长度,则这次获取后已经触底 + if consts.MomentRecommThroughput >= recommlistLength { + err = execFunc(ctx, &dbstruct.UserMomentVisitOffset{ + Id: mid, + MomentRecommOffset: 0, + BottomFlag: goproto.Int64(1), + Ver: 0, + }) + if err != nil { + logger.Error("_DefaultUserMomentVisitOffset OpCreate fail, err: %v", err) + return + } + } else { + err = execFunc(ctx, &dbstruct.UserMomentVisitOffset{ + Id: mid, + MomentRecommOffset: consts.MomentRecommThroughput, + BottomFlag: goproto.Int64(0), + Ver: 0, + }) + if err != nil { + logger.Error("_DefaultUserMomentVisitOffset OpCreate fail, err: %v", err) + return + } + } + + return nil +} + +func (s *Service) utilGetUpUserMomentVisitOffset(ctx *gin.Context, mid int64, recommlistLength int64) (uservisitoffset *dbstruct.UserMomentVisitOffset, err error) { + uservisitoffset, err = _DefaultUserMomentVisitOffset.OpGetUserMomentVisitOffset(ctx, mid) + if err != nil { + logger.Error("OpGetUserMomentVisitOffset fail, err: %v", err) + return + } + if uservisitoffset == nil { + // 默认用户刚进推荐页面的第一次操作一定是冷启动操作,游标也由冷启动创建 + return nil, qmgo.ErrNoSuchDocuments + } else { + nowoffset := (uservisitoffset.MomentRecommOffset + consts.MomentRecommThroughput) % recommlistLength + // 向上操作固定清掉触底标志 + var bottomFlagPtr *int64 = nil + if uservisitoffset.GetBottomFlag() == 1 { + bottomFlagPtr = goproto.Int64(0) + } + err = _DefaultUserMomentVisitOffset.OpUpdate(ctx, &dbstruct.UserMomentVisitOffset{ + Id: uservisitoffset.Id, + MomentRecommOffset: nowoffset, + BottomFlag: bottomFlagPtr, + Ver: uservisitoffset.Ver, + }) + if err != nil { + logger.Error("_DefaultUserMomentVisitOffset OpUpdate fail, err: %v", err) + return + } + } + return +} + +func (s *Service) utilGetDownUserMomentVisitOffset(ctx *gin.Context, mid int64, recommlistLength int64) (uservisitoffset *dbstruct.UserMomentVisitOffset, err error) { + uservisitoffset, err = _DefaultUserMomentVisitOffset.OpGetUserMomentVisitOffset(ctx, mid) + if err != nil { + logger.Error("OpGetUserMomentVisitOffset fail, err: %v", err) + return + } + + if uservisitoffset == nil { + // 默认用户刚进推荐页面的第一次操作一定是冷启动操作,游标也由冷启动创建 + return nil, qmgo.ErrNoSuchDocuments + } else { + if uservisitoffset.GetBottomFlag() == 1 { + return + } + offset := uservisitoffset.MomentRecommOffset + if offset+consts.MomentRecommThroughput >= recommlistLength { + err = _DefaultUserMomentVisitOffset.OpUpdate(ctx, &dbstruct.UserMomentVisitOffset{ + Id: uservisitoffset.Id, + MomentRecommOffset: 0, + BottomFlag: goproto.Int64(1), + Ver: uservisitoffset.Ver, + }) + if err != nil { + logger.Error("_DefaultUserMomentVisitOffset OpUpdate fail, err: %v", err) + return + } + } else { + err = _DefaultUserMomentVisitOffset.OpUpdate(ctx, &dbstruct.UserMomentVisitOffset{ + Id: uservisitoffset.Id, + MomentRecommOffset: offset + consts.MomentRecommThroughput, + Ver: uservisitoffset.Ver, + }) + if err != nil { + logger.Error("_DefaultUserMomentVisitOffset OpUpdate fail, err: %v", err) + return + } + } + } + return +} + +// 获取该用户当前在主播推荐数组中的游标和是否已经触底标志 +func (s *Service) utilGetMomentRecommlistOffsetAndBottomeFlag(ctx *gin.Context, recommListLength int64, mid int64, opType int64) (offset int64, bottomFlag int64, err error) { + offset = int64(0) + var uservisitoffset *dbstruct.UserMomentVisitOffset + + // 初始化操作 + switch opType { + case consts.Recomm_Init: + err = s.utilGetInitUserMomentVisitOffset(ctx, mid, int64(recommListLength)) + if err != nil { + logger.Error("utilGetInitUserMomentVisitOffset fail, err: %v", err) + return + } + // 向上滚动操作 + case consts.Recomm_Up: + // 若吞吐量比推荐数组长度小,则正常操作,否则认为游标永远是0 + if consts.MomentRecommThroughput < recommListLength { + uservisitoffset, err = s.utilGetUpUserMomentVisitOffset(ctx, mid, int64(recommListLength)) + if err != nil { + logger.Error("utilGetUpUserMomentVisitOffset fail, err: %v", err) + return + } + offset = uservisitoffset.MomentRecommOffset + } + // 向下滚动操作 + case consts.Recomm_Down: + // 若吞吐量比推荐数组长度小,则正常操作,否则认为直接触底 + if consts.MomentRecommThroughput < recommListLength { + uservisitoffset, err = s.utilGetDownUserMomentVisitOffset(ctx, mid, int64(recommListLength)) + if err != nil { + logger.Error("utilGetUpUserMomentVisitOffset fail, err: %v", err) + return + } + if uservisitoffset.GetBottomFlag() == 1 { + return 0, 1, nil + } + offset = uservisitoffset.MomentRecommOffset + } else { + return 0, 1, nil + } + } + return +} + +func (s *Service) utilGetMomentRecommListVO(ctx *gin.Context, recommlist []int64, mid int64, opType int64) (recommMomentList []*momentproto.ApiMomentVO, err error) { + // 获取用户游标 + offset := int64(0) + recommListLength := len(recommlist) + upperBound := recommListLength + + offset, bottomFlag, err := s.utilGetMomentRecommlistOffsetAndBottomeFlag(ctx, int64(recommListLength), mid, opType) + if err != nil { + logger.Error("utilGetMomentRecommlistOffsetAndBottomeFlag fail, err: %v", err) + return + } + if bottomFlag == 1 { // 已触底 + return make([]*momentproto.ApiMomentVO, 0), nil + } + + if consts.MomentRecommThroughput < recommListLength { + upperBound = consts.MomentRecommThroughput + } + + // 根据用户游标查询得到结果 + midList := make([]int64, 0) + for i := 0; i < upperBound; i++ { + index := (offset + int64(i)) % int64(recommListLength) + midList = append(midList, recommlist[index]) + } + + list, err := _DefaultMoment.OpListByIds(ctx, &momentproto.OpListByIdsReq{ + Ids: midList, + }) + if err != nil { + logger.Error("_DefaultMoment OpListByIds fail, , err: %v", err) + return + } + + // 获取访问者的关注列表 + followMap, err := s.utilGetFollowMap(ctx, mid) + if err != nil { + logger.Error("utilGetFollowMap fail") + return + } + + // 填充主播信息 + vos, ec := s.utilFillMomentsStreamerInfo(ctx, list, consts.InterfaceType_Api) + if ec != errcode.ErrCodeMomentSrvOk { + logger.Error("utilFillMomentsStreamerInfo fail, , err: %v", err) + return + } + recommMomentList = make([]*momentproto.ApiMomentVO, 0) + for _, vo := range vos { + apiVO, _ := vo.(*momentproto.ApiMomentVO) + // 填充是否关注 + s.utilFillIsFollowedFillable(ctx, followMap, apiVO) + // 填充是否点赞 + if err = s.utilFillIsThumbedUpFillable(ctx, mid, apiVO); err != nil { + logger.Error("utilFillIsThumbedUpFillable fail") + return + } + recommMomentList = append(recommMomentList, apiVO) + } + + return +} diff --git a/app/mix/service/xxljob_tasks.go b/app/mix/service/xxljob_tasks.go index 94427118..c1cc4544 100644 --- a/app/mix/service/xxljob_tasks.go +++ b/app/mix/service/xxljob_tasks.go @@ -8,6 +8,7 @@ import ( accountproto "service/api/proto/account/proto" contact_customer_service_proto "service/api/proto/contact_customer_service/proto" daily_statementproto "service/api/proto/daily_statement/proto" + momentproto "service/api/proto/moment/proto" streamerproto "service/api/proto/streamer/proto" vasproto "service/api/proto/vas/proto" "service/bizcommon/util" @@ -297,3 +298,26 @@ func (s *CronService) SendContactCustomerServicesOfLastMinute(ctx context.Contex return "Message send success" } + +func (s *CronService) ReloadMomentRecommList(ctx context.Context, param *xxl.RunReq) (msg string) { + logger.Info("task %v param: %v log_id: %v", param.ExecutorHandler, param.ExecutorParams, xxl.Int64ToStr(param.LogID)) + logger.Info("Refreshing recommendation list cached in redis...") + list, err := _DefaultMoment.OpList(&gin.Context{}, &momentproto.OpListReq{ + CtLowerBound: goproto.Int64(0), + }) + if err != nil { + logger.Error("OpGetMomentList fail, err: %v", err) + return fmt.Sprintf("OpGetMomentList fail, err: %v", err) + } + momentIdList := make([]int64, len(list)) + for i, moment := range list { + momentIdList[i] = util.DerefInt64(moment.Id) + } + err = redis.GetRedisClient().Set(consts.RedisMomentPrefix+"recomm_list", momentIdList, 0) + if err != nil { + logger.Error("Redis cache fail, err: %v", err) + return fmt.Sprintf("Redis cache fail, err: %v", err) + } + logger.Info("Refresh moment recommendation list cached in redis accomplished...") + return "Refresh moment recommendation list cached in redis accomplished" +} diff --git a/dbstruct/user_moment_visit_offset.go b/dbstruct/user_moment_visit_offset.go new file mode 100644 index 00000000..4879b733 --- /dev/null +++ b/dbstruct/user_moment_visit_offset.go @@ -0,0 +1,15 @@ +package dbstruct + +type UserMomentVisitOffset struct { + Id int64 `json:"id" bson:"_id"` // id,用户的mid + MomentRecommOffset int64 `json:"moment_recomm_offset" bson:"moment_recomm_offset"` // 动态推荐列表偏移量 + BottomFlag *int64 `json:"bottom_flag" bson:"bottom_flag"` // 是否触底标志 + Ver int64 `json:"ver" bson:"ver"` // 乐观锁 +} + +func (p *UserMomentVisitOffset) GetBottomFlag() int64 { + if p == nil || p.BottomFlag == nil { + return 0 + } + return *p.BottomFlag +}