diff --git a/api/consts/consts.go b/api/consts/consts.go index 5f86b535..c98b5835 100644 --- a/api/consts/consts.go +++ b/api/consts/consts.go @@ -15,6 +15,12 @@ const ( DefaultPageSize = 10 //默认页长为10 ) +// 推荐每次吞吐量 +const ( + StreamerRecommThroughput = 4 + MomentRecommThroughput = 4 +) + // 设备类型 const ( DevType_Android = 0 @@ -70,7 +76,7 @@ const ( RedisStreamerPrefix = "streamer:" //streamer服务前缀 ) -//const PackageRootPath = "C:/Users/PC/Desktop/wishpal_ironfan_service/service" +//const PackageRootPath = "C:/Users/PC/Desktop/service" const PackageRootPath = "/app/wishpal-ironfan" @@ -78,7 +84,7 @@ const MainConfigPath = PackageRootPath + "/etc/mix/mix-test.yaml" const ProductionConfigPath = PackageRootPath + "/etc/mix/mix-prod.yaml" -const LocalConfigPath = "/Users/erwin/wishpal/wishpal-ironfan/etc/mix/mix-local.yaml" +const LocalConfigPath = "C:/Users/PC/Desktop/service/etc/mix/mix-local.yaml" const ReservedUserIdRegexesConfig = PackageRootPath + "/etc/mix/resource/reg_reserved_user_id_config.xml" diff --git a/api/consts/option.go b/api/consts/option.go index 106c1f3e..542a0153 100644 --- a/api/consts/option.go +++ b/api/consts/option.go @@ -9,3 +9,8 @@ const ( TextAuditTaskUpdate_Pass = 0 TextAuditTaskUpdate_Rollback = 1 ) + +const ( + Recomm_Down = 0 + Recomm_Up = 1 +) diff --git a/api/errcode/errcode.go b/api/errcode/errcode.go index 9bf99398..aa9e1c2d 100644 --- a/api/errcode/errcode.go +++ b/api/errcode/errcode.go @@ -150,6 +150,9 @@ var ErrCodeMsgMap = map[ErrCode]string{ ErrCodeMomentAuditTaskSrvFail: "动态审核任务表服务错误", ErrCodeMomentAuditTaskNotExist: "动态审核任务表不存在", + + ErrCodeUserVisitOffsetSrvFail: "用户游标表服务错误", + ErrCodeUserVisitOffsetNotExist: "用户游标表不存在", } const ( @@ -358,6 +361,11 @@ const ( ErrCodeMomentAuditTaskSrvFail ErrCode = -29001 // 动态审核任务表服务错误 ErrCodeMomentAuditTaskNotExist ErrCode = -29002 // 动态审核任务表不存在 + // UserVisitOffset: 30xxx + ErrCodeUserVisitOffsetSrvOk ErrCode = ErrCodeOk + ErrCodeUserVisitOffsetSrvFail ErrCode = -31001 // 用户游标表服务错误 + ErrCodeUserVisitOffsetNotExist ErrCode = -31002 // 用户游标表不存在 + // Media: 60xxx ErrCodeMediaSrvOk ErrCode = ErrCodeOk ErrCodeMediaSrvFail ErrCode = -60001 // 媒体服务错误 diff --git a/api/proto/streamer/proto/streamer_api.go b/api/proto/streamer/proto/streamer_api.go index 409b915c..a8d6b598 100644 --- a/api/proto/streamer/proto/streamer_api.go +++ b/api/proto/streamer/proto/streamer_api.go @@ -180,12 +180,11 @@ type ApiListStreamerWxIdResp struct { // api 推荐 type ApiRecommListReq struct { base.BaseRequest - Offset int `json:"offset"` - Limit int `json:"limit"` + OpType int64 `json:"op_type"` } type ApiRecommListData struct { - RecommList []int64 `json:"recomm_list"` + RecommList []*ApiListExtVO `json:"recomm_list"` } type ApiRecommListResp struct { diff --git a/app/mix/dao/mongo.go b/app/mix/dao/mongo.go index 7ff1516f..977e0f31 100644 --- a/app/mix/dao/mongo.go +++ b/app/mix/dao/mongo.go @@ -154,6 +154,9 @@ const ( DBMomentAuditTask = "moment_audit_task" COLMomentAuditTask = "moment_audit_task" + + DBUserVisitOffset = "user_visit_offset" + COLUserVisitOffset = "user_visit_offset" ) // 商品表 @@ -360,6 +363,11 @@ func (m *Mongo) getColMomentAuditTask() *qmgo.Collection { return m.clientMix.Database(DBMomentAuditTask).Collection(COLMomentAuditTask) } +// 用户访问偏移量表 +func (m *Mongo) getColUserVisitOffset() *qmgo.Collection { + return m.clientMix.Database(DBUserVisitOffset).Collection(COLUserVisitOffset) +} + // 商品相关 func (m *Mongo) CreateProduct(ctx *gin.Context, product *dbstruct.Product) error { col := m.getColProduct() @@ -3297,3 +3305,34 @@ func (m *Mongo) GetAppConfigListByKey(ctx *gin.Context, req *appconfigproto.OpLi } return appconfig, err } + +func (m *Mongo) CreateUserVisitOffset(ctx *gin.Context, uservisitoffset *dbstruct.UserVisitOffset) error { + col := m.getColUserVisitOffset() + _, err := col.InsertOne(ctx, uservisitoffset) + return err +} + +func (m *Mongo) UpdateUserVisitOffset(ctx *gin.Context, uservisitoffset *dbstruct.UserVisitOffset) error { + col := m.getColUserVisitOffset() + filter := qmgo.M{ + "_id": uservisitoffset.Id, + "ver": uservisitoffset.Ver, + } + update := qmgo.M{ + "$set": qmgo.M{ + "streamer_recomm_offset": uservisitoffset.StreamerRecommOffset, + }, + } + err := col.UpdateOne(ctx, filter, update) + return err +} + +func (m *Mongo) GetUserVisitOffset(ctx *gin.Context, mid int64) (*dbstruct.UserVisitOffset, error) { + uservisitoffset := &dbstruct.UserVisitOffset{} + col := m.getColUserVisitOffset() + query := qmgo.M{ + "_id": mid, + } + err := col.Find(ctx, query).One(uservisitoffset) + return uservisitoffset, err +} diff --git a/app/mix/service/apiservice.go b/app/mix/service/apiservice.go index cfafedb5..caaf906f 100644 --- a/app/mix/service/apiservice.go +++ b/app/mix/service/apiservice.go @@ -32,7 +32,6 @@ import ( "service/library/contentaudit/imageaudit" "service/library/contentaudit/textaudit" "service/library/logger" - "service/library/redis" "go.mongodb.org/mongo-driver/mongo" goproto "google.golang.org/protobuf/proto" @@ -1206,7 +1205,7 @@ func (s *Service) ApiGetStreamerWxId(ctx *gin.Context, req *streamerproto.ApiLis } // 推荐 -func (s *Service) ApiGetStreamerRecommList(ctx *gin.Context, req *streamerproto.ApiRecommListReq) (recommlist []int64, ec errcode.ErrCode) { +func (s *Service) ApiGetStreamerRecommList(ctx *gin.Context, req *streamerproto.ApiRecommListReq) (recommStreamerList []*streamerproto.ApiListExtVO, ec errcode.ErrCode) { ec = errcode.ErrCodeStreamerSrvOk @@ -1218,48 +1217,15 @@ func (s *Service) ApiGetStreamerRecommList(ctx *gin.Context, req *streamerproto. return nil, errcode.ErrCodeApolloReadFail } - //1.从redis中获取数据 - err = redis.GetRedisClient().GetObject(consts.RedisStreamerPrefix+"recomm_list", &recommlist) + // 从redis中获取主播列表 + recommlist, err := s.utilGetStreamerRecommList(ctx) if err != nil { - logger.Error("Redis read failed : %v", err) - ec = errcode.ErrCodeStreamerRecommListRedisCacheInvalid + logger.Error("utilGetStreamerRecommList fail, err: %v", err) + ec = errcode.ErrCodeStreamerSrvFail return } - //2.若redis命中失败,再从数据库查 - if len(recommlist) == 0 { - logger.Error("Redis hit failed, reading recommendation list from mongo...") - - // recommlist, _, err = _DefaultAccountRelation.OpIsFollowedCount(ctx) - // if err != nil { - // logger.Error("OpIsFollowedCount fail, err: %v", err) - // ec = errcode.ErrCodeAccountRelationSrvFail - // return - // } - - list, err := _DefaultStreamer.OpList(ctx, &streamerproto.OpListReq{ - Sort: "-fans", - }) - if err != nil { - logger.Error("OpList fail, err: %v", err) - ec = errcode.ErrCodeAccountRelationSrvFail - return - } - recommlist = make([]int64, len(list)) - for i, streamer := range list { - recommlist[i] = util.DerefInt64(streamer.Mid) - } - - //若数据库命中成功,则立即加载进redis - if len(recommlist) != 0 { - err := redis.GetRedisClient().Set(consts.RedisStreamerPrefix+"recomm_list", recommlist, 0) - if err != nil { - logger.Error("Redis cache fail, err: %v", err) - } - - } - } - + // 查询是否限制访客 visitorMid := req.BaseRequest.Mid for _, restricted_visitor_mid := range cfg.RestrictedVisitorMids { if restricted_visitor_mid == visitorMid { // 是限制访问的访客 @@ -1278,6 +1244,45 @@ func (s *Service) ApiGetStreamerRecommList(ctx *gin.Context, req *streamerproto. } } + // 获取用户游标 + recommListLength := int64(len(recommlist)) + offset, err := s.utilGetUserVisitOffset(ctx, req.Mid, recommListLength) + if err != nil { + logger.Error("utilGetUserVisitOffset fail, err: %v", err) + ec = errcode.ErrCodeUserVisitOffsetSrvFail + return + } + + // 根据用户游标查询得到结果 + midList := make([]int64, 0) + for i := 0; i < consts.StreamerRecommThroughput; i++ { + index := (offset + int64(i)) % recommListLength + midList = append(midList, recommlist[index]) + } + accountList, err := _DefaultAccount.OpListByMids(ctx, &accountproto.OpListByMidsReq{ + Mids: midList, + }) + if err != nil { + logger.Error("_DefaultAccount OpListByMids fail, err: %v", err) + ec = errcode.ErrCodeStreamerSrvFail + return + } + + streamerExts := make([]streamerproto.StreamerExtVO, len(accountList)) + for i := range streamerExts { + streamerExts[i] = &streamerproto.ApiListExtVO{} + } + ec = s.utilExtendAccountsIntoStreamerExts(ctx, accountList, streamerExts) + if ec != errcode.ErrCodeStreamerSrvOk { + logger.Error("Extend accountlist into streamer_exts fail, req: %v, err: %v", util.ToJson(req), err) + return + } + recommStreamerList = make([]*streamerproto.ApiListExtVO, len(streamerExts)) + for i, streamerExt := range streamerExts { + vo, _ := streamerExt.(*streamerproto.ApiListExtVO) + recommStreamerList[i] = vo + } + return } diff --git a/app/mix/service/logic/user_visit_offset.go b/app/mix/service/logic/user_visit_offset.go new file mode 100644 index 00000000..0c83578c --- /dev/null +++ b/app/mix/service/logic/user_visit_offset.go @@ -0,0 +1,49 @@ +package logic + +import ( + "service/app/mix/dao" + "service/dbstruct" + "service/library/logger" + + "github.com/gin-gonic/gin" +) + +type UserVisitOffset struct { + store *dao.Store +} + +func NewUserVisitOffset(store *dao.Store) (a *UserVisitOffset) { + a = &UserVisitOffset{ + store: store, + } + return +} + +func (p *UserVisitOffset) OpCreate(ctx *gin.Context, uservisitoffset *dbstruct.UserVisitOffset) error { + err := p.store.CreateUserVisitOffset(ctx, uservisitoffset) + if err != nil { + logger.Error("CreateUserVisitOffset fail, err: %v", err) + return err + } + return nil +} + +func (p *UserVisitOffset) OpUpdate(ctx *gin.Context, uservisitoffset *dbstruct.UserVisitOffset) error { + err := p.store.UpdateUserVisitOffset(ctx, uservisitoffset) + if err != nil { + logger.Error("UpdateUserVisitOffset fail, err: %v", err) + return err + } + return nil +} + +func (p *UserVisitOffset) OpGetUserVisitOffset(ctx *gin.Context, mid int64) (*dbstruct.UserVisitOffset, error) { + + uservisitoffset, err := p.store.GetUserVisitOffset(ctx, mid) + if err != nil { + logger.Error("GetUserVisitOffset fail, err: %v", err) + return nil, err + } + + return uservisitoffset, nil +} diff --git a/app/mix/service/service.go b/app/mix/service/service.go index be24447f..4d9510df 100644 --- a/app/mix/service/service.go +++ b/app/mix/service/service.go @@ -100,6 +100,7 @@ var ( _DefaultXxlJob *logic.XxlJob _DefaultAppConfig *logic.AppConfig _DefaultMomentAuditTask *logic.MomentAuditTask + _DefaultUserVisitOffset *logic.UserVisitOffset ) type Service struct { @@ -176,6 +177,7 @@ func (s *Service) Init(c any) (err error) { _DefaultXxlJob = logic.NewXxlJob(store, cfg.XxlJob) _DefaultAppConfig = logic.NewAppConfig(store) _DefaultMomentAuditTask = logic.NewMomentAuditTask(store) + _DefaultUserVisitOffset = logic.NewUserVisitOffset(store) return } diff --git a/app/mix/service/utilservice.go b/app/mix/service/utilservice.go index 08bb0755..45cf5c6b 100644 --- a/app/mix/service/utilservice.go +++ b/app/mix/service/utilservice.go @@ -21,6 +21,7 @@ import ( "service/bizcommon/util" "service/dbstruct" "service/library/logger" + "service/library/redis" "strings" "time" @@ -626,3 +627,73 @@ func (s *Service) utilStringifyContactCustomerServices(ctx *gin.Context, contact msg = msgBuilder.String() return } + +func (s *Service) utilGetStreamerRecommList(ctx *gin.Context) (recommlist []int64, err error) { + + // 1.从redis中获取数据 + err = redis.GetRedisClient().GetObject(consts.RedisStreamerPrefix+"recomm_list", &recommlist) + if err != nil { + logger.Error("Redis read failed : %v", err) + return + } + + // 2.若redis命中失败,再从数据库查 + if len(recommlist) == 0 { + logger.Error("Redis hit failed, reading recommendation list from mongo...") + + list, err := _DefaultStreamer.OpList(ctx, &streamerproto.OpListReq{ + Sort: "-fans", + }) + if err != nil { + logger.Error("OpList fail, err: %v", err) + return nil, err + } + recommlist = make([]int64, len(list)) + for i, streamer := range list { + recommlist[i] = util.DerefInt64(streamer.Mid) + } + + //若数据库命中成功,则立即加载进redis + if len(recommlist) != 0 { + err := redis.GetRedisClient().Set(consts.RedisStreamerPrefix+"recomm_list", recommlist, 0) + if err != nil { + logger.Error("Redis cache fail, err: %v", err) + } + } + } + + return +} + +func (s *Service) utilGetUserVisitOffset(ctx *gin.Context, mid int64, recommlistLength int64) (offset int64, err error) { + offset = int64(0) + uservisitoffset, err := _DefaultUserVisitOffset.OpGetUserVisitOffset(ctx, mid) + if err != nil { + logger.Error("OpGetUserVisitOffset fail, err: %v", err) + return + } + if uservisitoffset == nil { + err = _DefaultUserVisitOffset.OpCreate(ctx, &dbstruct.UserVisitOffset{ + Id: mid, + StreamerRecommOffset: 0, + Ver: 0, + }) + if err != nil { + logger.Error("_DefaultUserVisitOffset OpCreate fail, err: %v", err) + return + } + } else { + offset = uservisitoffset.StreamerRecommOffset + nowoffset := (offset + consts.StreamerRecommThroughput) % recommlistLength + err = _DefaultUserVisitOffset.OpUpdate(ctx, &dbstruct.UserVisitOffset{ + Id: uservisitoffset.Id, + StreamerRecommOffset: nowoffset, + Ver: uservisitoffset.Ver, + }) + if err != nil { + logger.Error("_DefaultUserVisitOffset OpUpdate fail, err: %v", err) + return + } + } + return +} diff --git a/dbstruct/user_visit_offset.go b/dbstruct/user_visit_offset.go new file mode 100644 index 00000000..037e3233 --- /dev/null +++ b/dbstruct/user_visit_offset.go @@ -0,0 +1,7 @@ +package dbstruct + +type UserVisitOffset struct { + Id int64 `json:"id" bson:"_id"` //id,用户的mid + StreamerRecommOffset int64 `json:"streamer_recomm_offset" bson:"streamer_recomm_offset"` //主播推荐列表偏移量 + Ver int64 `json:"ver" bson:"ver"` //乐观锁 +}