diff --git a/api/consts/consts.go b/api/consts/consts.go index 5f86b535..c98b5835 100644 --- a/api/consts/consts.go +++ b/api/consts/consts.go @@ -15,6 +15,12 @@ const ( DefaultPageSize = 10 //默认页长为10 ) +// 推荐每次吞吐量 +const ( + StreamerRecommThroughput = 4 + MomentRecommThroughput = 4 +) + // 设备类型 const ( DevType_Android = 0 @@ -70,7 +76,7 @@ const ( RedisStreamerPrefix = "streamer:" //streamer服务前缀 ) -//const PackageRootPath = "C:/Users/PC/Desktop/wishpal_ironfan_service/service" +//const PackageRootPath = "C:/Users/PC/Desktop/service" const PackageRootPath = "/app/wishpal-ironfan" @@ -78,7 +84,7 @@ const MainConfigPath = PackageRootPath + "/etc/mix/mix-test.yaml" const ProductionConfigPath = PackageRootPath + "/etc/mix/mix-prod.yaml" -const LocalConfigPath = "/Users/erwin/wishpal/wishpal-ironfan/etc/mix/mix-local.yaml" +const LocalConfigPath = "C:/Users/PC/Desktop/service/etc/mix/mix-local.yaml" const ReservedUserIdRegexesConfig = PackageRootPath + "/etc/mix/resource/reg_reserved_user_id_config.xml" diff --git a/api/consts/option.go b/api/consts/option.go index 106c1f3e..542a0153 100644 --- a/api/consts/option.go +++ b/api/consts/option.go @@ -9,3 +9,8 @@ const ( TextAuditTaskUpdate_Pass = 0 TextAuditTaskUpdate_Rollback = 1 ) + +const ( + Recomm_Down = 0 + Recomm_Up = 1 +) diff --git a/api/errcode/errcode.go b/api/errcode/errcode.go index 3503eab8..0a51077f 100644 --- a/api/errcode/errcode.go +++ b/api/errcode/errcode.go @@ -157,6 +157,9 @@ var ErrCodeMsgMap = map[ErrCode]string{ ErrCodeAccountCancellationSrvFail: "账户注销服务错误", ErrCodeAccountCancellationNotExist: "账户注销不存在", + + ErrCodeUserVisitOffsetSrvFail: "用户游标表服务错误", + ErrCodeUserVisitOffsetNotExist: "用户游标表不存在", } const ( @@ -374,6 +377,11 @@ const ( ErrCodeAccountCancellationSrvFail ErrCode = -30001 // 账户注销服务错误 ErrCodeAccountCancellationNotExist ErrCode = -30002 // 账户注销不存在 + // UserVisitOffset: 30xxx + ErrCodeUserVisitOffsetSrvOk ErrCode = ErrCodeOk + ErrCodeUserVisitOffsetSrvFail ErrCode = -31001 // 用户游标表服务错误 + ErrCodeUserVisitOffsetNotExist ErrCode = -31002 // 用户游标表不存在 + // Media: 60xxx ErrCodeMediaSrvOk ErrCode = ErrCodeOk ErrCodeMediaSrvFail ErrCode = -60001 // 媒体服务错误 diff --git a/api/proto/streamer/proto/streamer_api.go b/api/proto/streamer/proto/streamer_api.go index 409b915c..a8d6b598 100644 --- a/api/proto/streamer/proto/streamer_api.go +++ b/api/proto/streamer/proto/streamer_api.go @@ -180,12 +180,11 @@ type ApiListStreamerWxIdResp struct { // api 推荐 type ApiRecommListReq struct { base.BaseRequest - Offset int `json:"offset"` - Limit int `json:"limit"` + OpType int64 `json:"op_type"` } type ApiRecommListData struct { - RecommList []int64 `json:"recomm_list"` + RecommList []*ApiListExtVO `json:"recomm_list"` } type ApiRecommListResp struct { diff --git a/app/mix/dao/mongo.go b/app/mix/dao/mongo.go index edba2cec..91f5fac1 100644 --- a/app/mix/dao/mongo.go +++ b/app/mix/dao/mongo.go @@ -160,6 +160,9 @@ const ( DBAccountCancellation = "account_cancellation" COLAccountCancellation = "account_cancellation" + + DBUserVisitOffset = "user_visit_offset" + COLUserVisitOffset = "user_visit_offset" ) // 商品表 @@ -381,6 +384,11 @@ func (m *Mongo) getColAccountCancellation() *qmgo.Collection { return m.clientMix.Database(DBAccountCancellation).Collection(COLAccountCancellation) } +// 用户访问偏移量表 +func (m *Mongo) getColUserVisitOffset() *qmgo.Collection { + return m.clientMix.Database(DBUserVisitOffset).Collection(COLUserVisitOffset) +} + // 商品相关 func (m *Mongo) CreateProduct(ctx *gin.Context, product *dbstruct.Product) error { col := m.getColProduct() @@ -3469,3 +3477,38 @@ func (m *Mongo) GetAccountCancellationListByMid(ctx *gin.Context, req *account_c } return accountCancellation, err } + +func (m *Mongo) CreateUserVisitOffset(ctx *gin.Context, uservisitoffset *dbstruct.UserVisitOffset) error { + col := m.getColUserVisitOffset() + _, err := col.InsertOne(ctx, uservisitoffset) + return err +} + +func (m *Mongo) UpdateUserVisitOffset(ctx *gin.Context, uservisitoffset *dbstruct.UserVisitOffset) error { + col := m.getColUserVisitOffset() + filter := qmgo.M{ + "_id": uservisitoffset.Id, + "ver": uservisitoffset.Ver, + } + update := qmgo.M{ + "$set": qmgo.M{ + "streamer_recomm_offset": uservisitoffset.StreamerRecommOffset, + }, + } + err := col.UpdateOne(ctx, filter, update) + return err +} + +func (m *Mongo) GetUserVisitOffset(ctx *gin.Context, mid int64) (*dbstruct.UserVisitOffset, error) { + uservisitoffset := &dbstruct.UserVisitOffset{} + col := m.getColUserVisitOffset() + query := qmgo.M{ + "_id": mid, + } + err := col.Find(ctx, query).One(uservisitoffset) + if err == qmgo.ErrNoSuchDocuments { + err = nil + return nil, err + } + return uservisitoffset, err +} diff --git a/app/mix/service/apiservice.go b/app/mix/service/apiservice.go index f1742708..5ff25ba3 100644 --- a/app/mix/service/apiservice.go +++ b/app/mix/service/apiservice.go @@ -33,7 +33,6 @@ import ( "service/library/contentaudit/imageaudit" "service/library/contentaudit/textaudit" "service/library/logger" - "service/library/redis" "time" "go.mongodb.org/mongo-driver/mongo" @@ -1339,7 +1338,7 @@ func (s *Service) ApiGetStreamerWxId(ctx *gin.Context, req *streamerproto.ApiLis } // 推荐 -func (s *Service) ApiGetStreamerRecommList(ctx *gin.Context, req *streamerproto.ApiRecommListReq) (recommlist []int64, ec errcode.ErrCode) { +func (s *Service) ApiGetStreamerRecommList(ctx *gin.Context, req *streamerproto.ApiRecommListReq) (recommStreamerList []*streamerproto.ApiListExtVO, ec errcode.ErrCode) { ec = errcode.ErrCodeStreamerSrvOk @@ -1351,48 +1350,15 @@ func (s *Service) ApiGetStreamerRecommList(ctx *gin.Context, req *streamerproto. return nil, errcode.ErrCodeApolloReadFail } - //1.从redis中获取数据 - err = redis.GetRedisClient().GetObject(consts.RedisStreamerPrefix+"recomm_list", &recommlist) + // 从redis中获取主播列表 + recommlist, err := s.utilGetStreamerRecommList(ctx) if err != nil { - logger.Error("Redis read failed : %v", err) - ec = errcode.ErrCodeStreamerRecommListRedisCacheInvalid + logger.Error("utilGetStreamerRecommList fail, err: %v", err) + ec = errcode.ErrCodeStreamerSrvFail return } - //2.若redis命中失败,再从数据库查 - if len(recommlist) == 0 { - logger.Error("Redis hit failed, reading recommendation list from mongo...") - - // recommlist, _, err = _DefaultAccountRelation.OpIsFollowedCount(ctx) - // if err != nil { - // logger.Error("OpIsFollowedCount fail, err: %v", err) - // ec = errcode.ErrCodeAccountRelationSrvFail - // return - // } - - list, err := _DefaultStreamer.OpList(ctx, &streamerproto.OpListReq{ - Sort: "-fans", - }) - if err != nil { - logger.Error("OpList fail, err: %v", err) - ec = errcode.ErrCodeAccountRelationSrvFail - return - } - recommlist = make([]int64, len(list)) - for i, streamer := range list { - recommlist[i] = util.DerefInt64(streamer.Mid) - } - - //若数据库命中成功,则立即加载进redis - if len(recommlist) != 0 { - err := redis.GetRedisClient().Set(consts.RedisStreamerPrefix+"recomm_list", recommlist, 0) - if err != nil { - logger.Error("Redis cache fail, err: %v", err) - } - - } - } - + // 查询是否限制访客 visitorMid := req.BaseRequest.Mid for _, restricted_visitor_mid := range cfg.RestrictedVisitorMids { if restricted_visitor_mid == visitorMid { // 是限制访问的访客 @@ -1411,6 +1377,22 @@ func (s *Service) ApiGetStreamerRecommList(ctx *gin.Context, req *streamerproto. } } + if req.OpType == consts.Recomm_Up { + recommStreamerList, err = s.utilGetUpStreamerRecommList(ctx, recommlist, req.Mid) + if err != nil { + logger.Error("utilGetUpStreamerRecommList fail, err: %v", err) + ec = errcode.ErrCodeStreamerSrvFail + return + } + } else { + recommStreamerList, err = s.utilGetUpStreamerRecommList(ctx, recommlist, req.Mid) + if err != nil { + logger.Error("utilGetUpStreamerRecommList fail, err: %v", err) + ec = errcode.ErrCodeStreamerSrvFail + return + } + } + return } diff --git a/app/mix/service/logic/user_visit_offset.go b/app/mix/service/logic/user_visit_offset.go new file mode 100644 index 00000000..0c83578c --- /dev/null +++ b/app/mix/service/logic/user_visit_offset.go @@ -0,0 +1,49 @@ +package logic + +import ( + "service/app/mix/dao" + "service/dbstruct" + "service/library/logger" + + "github.com/gin-gonic/gin" +) + +type UserVisitOffset struct { + store *dao.Store +} + +func NewUserVisitOffset(store *dao.Store) (a *UserVisitOffset) { + a = &UserVisitOffset{ + store: store, + } + return +} + +func (p *UserVisitOffset) OpCreate(ctx *gin.Context, uservisitoffset *dbstruct.UserVisitOffset) error { + err := p.store.CreateUserVisitOffset(ctx, uservisitoffset) + if err != nil { + logger.Error("CreateUserVisitOffset fail, err: %v", err) + return err + } + return nil +} + +func (p *UserVisitOffset) OpUpdate(ctx *gin.Context, uservisitoffset *dbstruct.UserVisitOffset) error { + err := p.store.UpdateUserVisitOffset(ctx, uservisitoffset) + if err != nil { + logger.Error("UpdateUserVisitOffset fail, err: %v", err) + return err + } + return nil +} + +func (p *UserVisitOffset) OpGetUserVisitOffset(ctx *gin.Context, mid int64) (*dbstruct.UserVisitOffset, error) { + + uservisitoffset, err := p.store.GetUserVisitOffset(ctx, mid) + if err != nil { + logger.Error("GetUserVisitOffset fail, err: %v", err) + return nil, err + } + + return uservisitoffset, nil +} diff --git a/app/mix/service/service.go b/app/mix/service/service.go index 1285ab6e..8b8af328 100644 --- a/app/mix/service/service.go +++ b/app/mix/service/service.go @@ -102,6 +102,7 @@ var ( _DefaultXxlJob *logic.XxlJob _DefaultMomentAuditTask *logic.MomentAuditTask _DefaultAccountCancellation *logic.AccountCancellation + _DefaultUserVisitOffset *logic.UserVisitOffset ) type Service struct { @@ -179,6 +180,7 @@ func (s *Service) Init(c any) (err error) { _DefaultXxlJob = logic.NewXxlJob(store, cfg.XxlJob) _DefaultMomentAuditTask = logic.NewMomentAuditTask(store) _DefaultAccountCancellation = logic.NewAccountCancellation(store) + _DefaultUserVisitOffset = logic.NewUserVisitOffset(store) return } diff --git a/app/mix/service/utilservice.go b/app/mix/service/utilservice.go index 897df529..9cf8606d 100644 --- a/app/mix/service/utilservice.go +++ b/app/mix/service/utilservice.go @@ -2,6 +2,7 @@ package service import ( "fmt" + "math" "service/api/consts" "service/api/errcode" "service/api/interfaces" @@ -23,6 +24,7 @@ import ( "service/dbstruct" "service/library/apollo" "service/library/logger" + "service/library/redis" "strings" "time" @@ -721,3 +723,166 @@ func (s *Service) utilCancelAccountByMids(ctx *gin.Context, midList []int64) err return nil } + +func (s *Service) utilGetStreamerRecommList(ctx *gin.Context) (recommlist []int64, err error) { + + // 1.从redis中获取数据 + err = redis.GetRedisClient().GetObject(consts.RedisStreamerPrefix+"recomm_list", &recommlist) + if err != nil { + logger.Error("Redis read failed : %v", err) + return + } + + // 2.若redis命中失败,再从数据库查 + if len(recommlist) == 0 { + logger.Error("Redis hit failed, reading recommendation list from mongo...") + + list, err := _DefaultStreamer.OpList(ctx, &streamerproto.OpListReq{ + Sort: "-fans", + }) + if err != nil { + logger.Error("OpList fail, err: %v", err) + return nil, err + } + recommlist = make([]int64, len(list)) + for i, streamer := range list { + recommlist[i] = util.DerefInt64(streamer.Mid) + } + + //若数据库命中成功,则立即加载进redis + if len(recommlist) != 0 { + err := redis.GetRedisClient().Set(consts.RedisStreamerPrefix+"recomm_list", recommlist, 0) + if err != nil { + logger.Error("Redis cache fail, err: %v", err) + } + } + } + + return +} + +func (s *Service) utilGetUserVisitOffset(ctx *gin.Context, mid int64, recommlistLength int64) (offset int64, err error) { + offset = int64(0) + uservisitoffset, err := _DefaultUserVisitOffset.OpGetUserVisitOffset(ctx, mid) + if err != nil { + logger.Error("OpGetUserVisitOffset fail, err: %v", err) + return + } + if uservisitoffset == nil { + nowoffset := (offset + consts.StreamerRecommThroughput) % recommlistLength + err = _DefaultUserVisitOffset.OpCreate(ctx, &dbstruct.UserVisitOffset{ + Id: mid, + StreamerRecommOffset: nowoffset, + Ver: 0, + }) + if err != nil { + logger.Error("_DefaultUserVisitOffset OpCreate fail, err: %v", err) + return + } + } else { + offset = uservisitoffset.StreamerRecommOffset + nowoffset := (offset + consts.StreamerRecommThroughput) % recommlistLength + err = _DefaultUserVisitOffset.OpUpdate(ctx, &dbstruct.UserVisitOffset{ + Id: uservisitoffset.Id, + StreamerRecommOffset: nowoffset, + Ver: uservisitoffset.Ver, + }) + if err != nil { + logger.Error("_DefaultUserVisitOffset OpUpdate fail, err: %v", err) + return + } + } + return +} + +func (s *Service) utilGetUpStreamerRecommList(ctx *gin.Context, recommlist []int64, mid int64) (recommStreamerList []*streamerproto.ApiListExtVO, err error) { + // 获取用户游标 + offset := int64(0) + recommListLength := int64(len(recommlist)) + if consts.StreamerRecommThroughput < recommListLength { + offset, err = s.utilGetUserVisitOffset(ctx, mid, recommListLength) + if err != nil { + logger.Error("utilGetUserVisitOffset fail, err: %v", err) + return + } + } + + // 根据用户游标查询得到结果 + midList := make([]int64, 0) + for i := 0; i < int(math.Min(float64(consts.StreamerRecommThroughput), float64(recommListLength))); i++ { + index := (offset + int64(i)) % recommListLength + midList = append(midList, recommlist[index]) + } + accountList, err := _DefaultAccount.OpListByMids(ctx, &accountproto.OpListByMidsReq{ + Mids: midList, + Sort: []string{"_id"}, + }) + if err != nil { + logger.Error("_DefaultAccount OpListByMids fail, err: %v", err) + return + } + + streamerExts := make([]streamerproto.StreamerExtVO, len(accountList)) + for i := range streamerExts { + streamerExts[i] = &streamerproto.ApiListExtVO{} + } + ec := s.utilExtendAccountsIntoStreamerExts(ctx, accountList, streamerExts) + if ec != errcode.ErrCodeStreamerSrvOk { + err = fmt.Errorf("Extend accountlist into streamer_exts fail") + logger.Error("Extend accountlist into streamer_exts fail") + return + } + recommStreamerList = make([]*streamerproto.ApiListExtVO, len(streamerExts)) + for i, streamerExt := range streamerExts { + vo, _ := streamerExt.(*streamerproto.ApiListExtVO) + recommStreamerList[i] = vo + } + + return +} + +func (s *Service) utilGetDownStreamerRecommList(ctx *gin.Context, recommlist []int64, mid int64) (recommStreamerList []*streamerproto.ApiListExtVO, err error) { + // 获取用户游标 + offset := int64(0) + recommListLength := int64(len(recommlist)) + if consts.StreamerRecommThroughput < recommListLength { + offset, err = s.utilGetUserVisitOffset(ctx, mid, recommListLength) + if err != nil { + logger.Error("utilGetUserVisitOffset fail, err: %v", err) + return + } + } + + // 根据用户游标查询得到结果 + midList := make([]int64, 0) + for i := 0; i < int(math.Min(float64(consts.StreamerRecommThroughput), float64(recommListLength))); i++ { + index := (offset + int64(i)) % recommListLength + midList = append(midList, recommlist[index]) + } + accountList, err := _DefaultAccount.OpListByMids(ctx, &accountproto.OpListByMidsReq{ + Mids: midList, + Sort: []string{"_id"}, + }) + if err != nil { + logger.Error("_DefaultAccount OpListByMids fail, err: %v", err) + return + } + + streamerExts := make([]streamerproto.StreamerExtVO, len(accountList)) + for i := range streamerExts { + streamerExts[i] = &streamerproto.ApiListExtVO{} + } + ec := s.utilExtendAccountsIntoStreamerExts(ctx, accountList, streamerExts) + if ec != errcode.ErrCodeStreamerSrvOk { + err = fmt.Errorf("extend accountlist into streamer_exts fail") + logger.Error("Extend accountlist into streamer_exts fail") + return + } + recommStreamerList = make([]*streamerproto.ApiListExtVO, len(streamerExts)) + for i, streamerExt := range streamerExts { + vo, _ := streamerExt.(*streamerproto.ApiListExtVO) + recommStreamerList[i] = vo + } + + return +} diff --git a/dbstruct/user_visit_offset.go b/dbstruct/user_visit_offset.go new file mode 100644 index 00000000..037e3233 --- /dev/null +++ b/dbstruct/user_visit_offset.go @@ -0,0 +1,7 @@ +package dbstruct + +type UserVisitOffset struct { + Id int64 `json:"id" bson:"_id"` //id,用户的mid + StreamerRecommOffset int64 `json:"streamer_recomm_offset" bson:"streamer_recomm_offset"` //主播推荐列表偏移量 + Ver int64 `json:"ver" bson:"ver"` //乐观锁 +}