Merge branch 'feat-IRONFANS-58-Robin' into conflict

This commit is contained in:
Leufolium 2024-03-08 01:30:30 +08:00
commit 960adc1455
10 changed files with 311 additions and 45 deletions

View File

@ -15,6 +15,12 @@ const (
DefaultPageSize = 10 //默认页长为10
)
// 推荐每次吞吐量
const (
StreamerRecommThroughput = 4
MomentRecommThroughput = 4
)
// 设备类型
const (
DevType_Android = 0
@ -70,7 +76,7 @@ const (
RedisStreamerPrefix = "streamer:" //streamer服务前缀
)
//const PackageRootPath = "C:/Users/PC/Desktop/wishpal_ironfan_service/service"
//const PackageRootPath = "C:/Users/PC/Desktop/service"
const PackageRootPath = "/app/wishpal-ironfan"
@ -78,7 +84,7 @@ const MainConfigPath = PackageRootPath + "/etc/mix/mix-test.yaml"
const ProductionConfigPath = PackageRootPath + "/etc/mix/mix-prod.yaml"
const LocalConfigPath = "/Users/erwin/wishpal/wishpal-ironfan/etc/mix/mix-local.yaml"
const LocalConfigPath = "C:/Users/PC/Desktop/service/etc/mix/mix-local.yaml"
const ReservedUserIdRegexesConfig = PackageRootPath + "/etc/mix/resource/reg_reserved_user_id_config.xml"

View File

@ -9,3 +9,8 @@ const (
TextAuditTaskUpdate_Pass = 0
TextAuditTaskUpdate_Rollback = 1
)
const (
Recomm_Down = 0
Recomm_Up = 1
)

View File

@ -157,6 +157,9 @@ var ErrCodeMsgMap = map[ErrCode]string{
ErrCodeAccountCancellationSrvFail: "账户注销服务错误",
ErrCodeAccountCancellationNotExist: "账户注销不存在",
ErrCodeUserVisitOffsetSrvFail: "用户游标表服务错误",
ErrCodeUserVisitOffsetNotExist: "用户游标表不存在",
}
const (
@ -374,6 +377,11 @@ const (
ErrCodeAccountCancellationSrvFail ErrCode = -30001 // 账户注销服务错误
ErrCodeAccountCancellationNotExist ErrCode = -30002 // 账户注销不存在
// UserVisitOffset: 30xxx
ErrCodeUserVisitOffsetSrvOk ErrCode = ErrCodeOk
ErrCodeUserVisitOffsetSrvFail ErrCode = -31001 // 用户游标表服务错误
ErrCodeUserVisitOffsetNotExist ErrCode = -31002 // 用户游标表不存在
// Media: 60xxx
ErrCodeMediaSrvOk ErrCode = ErrCodeOk
ErrCodeMediaSrvFail ErrCode = -60001 // 媒体服务错误

View File

@ -180,12 +180,11 @@ type ApiListStreamerWxIdResp struct {
// api 推荐
type ApiRecommListReq struct {
base.BaseRequest
Offset int `json:"offset"`
Limit int `json:"limit"`
OpType int64 `json:"op_type"`
}
type ApiRecommListData struct {
RecommList []int64 `json:"recomm_list"`
RecommList []*ApiListExtVO `json:"recomm_list"`
}
type ApiRecommListResp struct {

View File

@ -160,6 +160,9 @@ const (
DBAccountCancellation = "account_cancellation"
COLAccountCancellation = "account_cancellation"
DBUserVisitOffset = "user_visit_offset"
COLUserVisitOffset = "user_visit_offset"
)
// 商品表
@ -381,6 +384,11 @@ func (m *Mongo) getColAccountCancellation() *qmgo.Collection {
return m.clientMix.Database(DBAccountCancellation).Collection(COLAccountCancellation)
}
// 用户访问偏移量表
func (m *Mongo) getColUserVisitOffset() *qmgo.Collection {
return m.clientMix.Database(DBUserVisitOffset).Collection(COLUserVisitOffset)
}
// 商品相关
func (m *Mongo) CreateProduct(ctx *gin.Context, product *dbstruct.Product) error {
col := m.getColProduct()
@ -3469,3 +3477,38 @@ func (m *Mongo) GetAccountCancellationListByMid(ctx *gin.Context, req *account_c
}
return accountCancellation, err
}
func (m *Mongo) CreateUserVisitOffset(ctx *gin.Context, uservisitoffset *dbstruct.UserVisitOffset) error {
col := m.getColUserVisitOffset()
_, err := col.InsertOne(ctx, uservisitoffset)
return err
}
func (m *Mongo) UpdateUserVisitOffset(ctx *gin.Context, uservisitoffset *dbstruct.UserVisitOffset) error {
col := m.getColUserVisitOffset()
filter := qmgo.M{
"_id": uservisitoffset.Id,
"ver": uservisitoffset.Ver,
}
update := qmgo.M{
"$set": qmgo.M{
"streamer_recomm_offset": uservisitoffset.StreamerRecommOffset,
},
}
err := col.UpdateOne(ctx, filter, update)
return err
}
func (m *Mongo) GetUserVisitOffset(ctx *gin.Context, mid int64) (*dbstruct.UserVisitOffset, error) {
uservisitoffset := &dbstruct.UserVisitOffset{}
col := m.getColUserVisitOffset()
query := qmgo.M{
"_id": mid,
}
err := col.Find(ctx, query).One(uservisitoffset)
if err == qmgo.ErrNoSuchDocuments {
err = nil
return nil, err
}
return uservisitoffset, err
}

View File

@ -33,7 +33,6 @@ import (
"service/library/contentaudit/imageaudit"
"service/library/contentaudit/textaudit"
"service/library/logger"
"service/library/redis"
"time"
"go.mongodb.org/mongo-driver/mongo"
@ -1339,7 +1338,7 @@ func (s *Service) ApiGetStreamerWxId(ctx *gin.Context, req *streamerproto.ApiLis
}
// 推荐
func (s *Service) ApiGetStreamerRecommList(ctx *gin.Context, req *streamerproto.ApiRecommListReq) (recommlist []int64, ec errcode.ErrCode) {
func (s *Service) ApiGetStreamerRecommList(ctx *gin.Context, req *streamerproto.ApiRecommListReq) (recommStreamerList []*streamerproto.ApiListExtVO, ec errcode.ErrCode) {
ec = errcode.ErrCodeStreamerSrvOk
@ -1351,48 +1350,15 @@ func (s *Service) ApiGetStreamerRecommList(ctx *gin.Context, req *streamerproto.
return nil, errcode.ErrCodeApolloReadFail
}
//1.从redis中获取数据
err = redis.GetRedisClient().GetObject(consts.RedisStreamerPrefix+"recomm_list", &recommlist)
// 从redis中获取主播列表
recommlist, err := s.utilGetStreamerRecommList(ctx)
if err != nil {
logger.Error("Redis read failed : %v", err)
ec = errcode.ErrCodeStreamerRecommListRedisCacheInvalid
logger.Error("utilGetStreamerRecommList fail, err: %v", err)
ec = errcode.ErrCodeStreamerSrvFail
return
}
//2.若redis命中失败再从数据库查
if len(recommlist) == 0 {
logger.Error("Redis hit failed, reading recommendation list from mongo...")
// recommlist, _, err = _DefaultAccountRelation.OpIsFollowedCount(ctx)
// if err != nil {
// logger.Error("OpIsFollowedCount fail, err: %v", err)
// ec = errcode.ErrCodeAccountRelationSrvFail
// return
// }
list, err := _DefaultStreamer.OpList(ctx, &streamerproto.OpListReq{
Sort: "-fans",
})
if err != nil {
logger.Error("OpList fail, err: %v", err)
ec = errcode.ErrCodeAccountRelationSrvFail
return
}
recommlist = make([]int64, len(list))
for i, streamer := range list {
recommlist[i] = util.DerefInt64(streamer.Mid)
}
//若数据库命中成功则立即加载进redis
if len(recommlist) != 0 {
err := redis.GetRedisClient().Set(consts.RedisStreamerPrefix+"recomm_list", recommlist, 0)
if err != nil {
logger.Error("Redis cache fail, err: %v", err)
}
}
}
// 查询是否限制访客
visitorMid := req.BaseRequest.Mid
for _, restricted_visitor_mid := range cfg.RestrictedVisitorMids {
if restricted_visitor_mid == visitorMid { // 是限制访问的访客
@ -1411,6 +1377,22 @@ func (s *Service) ApiGetStreamerRecommList(ctx *gin.Context, req *streamerproto.
}
}
if req.OpType == consts.Recomm_Up {
recommStreamerList, err = s.utilGetUpStreamerRecommList(ctx, recommlist, req.Mid)
if err != nil {
logger.Error("utilGetUpStreamerRecommList fail, err: %v", err)
ec = errcode.ErrCodeStreamerSrvFail
return
}
} else {
recommStreamerList, err = s.utilGetUpStreamerRecommList(ctx, recommlist, req.Mid)
if err != nil {
logger.Error("utilGetUpStreamerRecommList fail, err: %v", err)
ec = errcode.ErrCodeStreamerSrvFail
return
}
}
return
}

View File

@ -0,0 +1,49 @@
package logic
import (
"service/app/mix/dao"
"service/dbstruct"
"service/library/logger"
"github.com/gin-gonic/gin"
)
type UserVisitOffset struct {
store *dao.Store
}
func NewUserVisitOffset(store *dao.Store) (a *UserVisitOffset) {
a = &UserVisitOffset{
store: store,
}
return
}
func (p *UserVisitOffset) OpCreate(ctx *gin.Context, uservisitoffset *dbstruct.UserVisitOffset) error {
err := p.store.CreateUserVisitOffset(ctx, uservisitoffset)
if err != nil {
logger.Error("CreateUserVisitOffset fail, err: %v", err)
return err
}
return nil
}
func (p *UserVisitOffset) OpUpdate(ctx *gin.Context, uservisitoffset *dbstruct.UserVisitOffset) error {
err := p.store.UpdateUserVisitOffset(ctx, uservisitoffset)
if err != nil {
logger.Error("UpdateUserVisitOffset fail, err: %v", err)
return err
}
return nil
}
func (p *UserVisitOffset) OpGetUserVisitOffset(ctx *gin.Context, mid int64) (*dbstruct.UserVisitOffset, error) {
uservisitoffset, err := p.store.GetUserVisitOffset(ctx, mid)
if err != nil {
logger.Error("GetUserVisitOffset fail, err: %v", err)
return nil, err
}
return uservisitoffset, nil
}

View File

@ -102,6 +102,7 @@ var (
_DefaultXxlJob *logic.XxlJob
_DefaultMomentAuditTask *logic.MomentAuditTask
_DefaultAccountCancellation *logic.AccountCancellation
_DefaultUserVisitOffset *logic.UserVisitOffset
)
type Service struct {
@ -179,6 +180,7 @@ func (s *Service) Init(c any) (err error) {
_DefaultXxlJob = logic.NewXxlJob(store, cfg.XxlJob)
_DefaultMomentAuditTask = logic.NewMomentAuditTask(store)
_DefaultAccountCancellation = logic.NewAccountCancellation(store)
_DefaultUserVisitOffset = logic.NewUserVisitOffset(store)
return
}

View File

@ -2,6 +2,7 @@ package service
import (
"fmt"
"math"
"service/api/consts"
"service/api/errcode"
"service/api/interfaces"
@ -23,6 +24,7 @@ import (
"service/dbstruct"
"service/library/apollo"
"service/library/logger"
"service/library/redis"
"strings"
"time"
@ -721,3 +723,166 @@ func (s *Service) utilCancelAccountByMids(ctx *gin.Context, midList []int64) err
return nil
}
func (s *Service) utilGetStreamerRecommList(ctx *gin.Context) (recommlist []int64, err error) {
// 1.从redis中获取数据
err = redis.GetRedisClient().GetObject(consts.RedisStreamerPrefix+"recomm_list", &recommlist)
if err != nil {
logger.Error("Redis read failed : %v", err)
return
}
// 2.若redis命中失败再从数据库查
if len(recommlist) == 0 {
logger.Error("Redis hit failed, reading recommendation list from mongo...")
list, err := _DefaultStreamer.OpList(ctx, &streamerproto.OpListReq{
Sort: "-fans",
})
if err != nil {
logger.Error("OpList fail, err: %v", err)
return nil, err
}
recommlist = make([]int64, len(list))
for i, streamer := range list {
recommlist[i] = util.DerefInt64(streamer.Mid)
}
//若数据库命中成功则立即加载进redis
if len(recommlist) != 0 {
err := redis.GetRedisClient().Set(consts.RedisStreamerPrefix+"recomm_list", recommlist, 0)
if err != nil {
logger.Error("Redis cache fail, err: %v", err)
}
}
}
return
}
func (s *Service) utilGetUserVisitOffset(ctx *gin.Context, mid int64, recommlistLength int64) (offset int64, err error) {
offset = int64(0)
uservisitoffset, err := _DefaultUserVisitOffset.OpGetUserVisitOffset(ctx, mid)
if err != nil {
logger.Error("OpGetUserVisitOffset fail, err: %v", err)
return
}
if uservisitoffset == nil {
nowoffset := (offset + consts.StreamerRecommThroughput) % recommlistLength
err = _DefaultUserVisitOffset.OpCreate(ctx, &dbstruct.UserVisitOffset{
Id: mid,
StreamerRecommOffset: nowoffset,
Ver: 0,
})
if err != nil {
logger.Error("_DefaultUserVisitOffset OpCreate fail, err: %v", err)
return
}
} else {
offset = uservisitoffset.StreamerRecommOffset
nowoffset := (offset + consts.StreamerRecommThroughput) % recommlistLength
err = _DefaultUserVisitOffset.OpUpdate(ctx, &dbstruct.UserVisitOffset{
Id: uservisitoffset.Id,
StreamerRecommOffset: nowoffset,
Ver: uservisitoffset.Ver,
})
if err != nil {
logger.Error("_DefaultUserVisitOffset OpUpdate fail, err: %v", err)
return
}
}
return
}
func (s *Service) utilGetUpStreamerRecommList(ctx *gin.Context, recommlist []int64, mid int64) (recommStreamerList []*streamerproto.ApiListExtVO, err error) {
// 获取用户游标
offset := int64(0)
recommListLength := int64(len(recommlist))
if consts.StreamerRecommThroughput < recommListLength {
offset, err = s.utilGetUserVisitOffset(ctx, mid, recommListLength)
if err != nil {
logger.Error("utilGetUserVisitOffset fail, err: %v", err)
return
}
}
// 根据用户游标查询得到结果
midList := make([]int64, 0)
for i := 0; i < int(math.Min(float64(consts.StreamerRecommThroughput), float64(recommListLength))); i++ {
index := (offset + int64(i)) % recommListLength
midList = append(midList, recommlist[index])
}
accountList, err := _DefaultAccount.OpListByMids(ctx, &accountproto.OpListByMidsReq{
Mids: midList,
Sort: []string{"_id"},
})
if err != nil {
logger.Error("_DefaultAccount OpListByMids fail, err: %v", err)
return
}
streamerExts := make([]streamerproto.StreamerExtVO, len(accountList))
for i := range streamerExts {
streamerExts[i] = &streamerproto.ApiListExtVO{}
}
ec := s.utilExtendAccountsIntoStreamerExts(ctx, accountList, streamerExts)
if ec != errcode.ErrCodeStreamerSrvOk {
err = fmt.Errorf("Extend accountlist into streamer_exts fail")
logger.Error("Extend accountlist into streamer_exts fail")
return
}
recommStreamerList = make([]*streamerproto.ApiListExtVO, len(streamerExts))
for i, streamerExt := range streamerExts {
vo, _ := streamerExt.(*streamerproto.ApiListExtVO)
recommStreamerList[i] = vo
}
return
}
func (s *Service) utilGetDownStreamerRecommList(ctx *gin.Context, recommlist []int64, mid int64) (recommStreamerList []*streamerproto.ApiListExtVO, err error) {
// 获取用户游标
offset := int64(0)
recommListLength := int64(len(recommlist))
if consts.StreamerRecommThroughput < recommListLength {
offset, err = s.utilGetUserVisitOffset(ctx, mid, recommListLength)
if err != nil {
logger.Error("utilGetUserVisitOffset fail, err: %v", err)
return
}
}
// 根据用户游标查询得到结果
midList := make([]int64, 0)
for i := 0; i < int(math.Min(float64(consts.StreamerRecommThroughput), float64(recommListLength))); i++ {
index := (offset + int64(i)) % recommListLength
midList = append(midList, recommlist[index])
}
accountList, err := _DefaultAccount.OpListByMids(ctx, &accountproto.OpListByMidsReq{
Mids: midList,
Sort: []string{"_id"},
})
if err != nil {
logger.Error("_DefaultAccount OpListByMids fail, err: %v", err)
return
}
streamerExts := make([]streamerproto.StreamerExtVO, len(accountList))
for i := range streamerExts {
streamerExts[i] = &streamerproto.ApiListExtVO{}
}
ec := s.utilExtendAccountsIntoStreamerExts(ctx, accountList, streamerExts)
if ec != errcode.ErrCodeStreamerSrvOk {
err = fmt.Errorf("extend accountlist into streamer_exts fail")
logger.Error("Extend accountlist into streamer_exts fail")
return
}
recommStreamerList = make([]*streamerproto.ApiListExtVO, len(streamerExts))
for i, streamerExt := range streamerExts {
vo, _ := streamerExt.(*streamerproto.ApiListExtVO)
recommStreamerList[i] = vo
}
return
}

View File

@ -0,0 +1,7 @@
package dbstruct
type UserVisitOffset struct {
Id int64 `json:"id" bson:"_id"` //id,用户的mid
StreamerRecommOffset int64 `json:"streamer_recomm_offset" bson:"streamer_recomm_offset"` //主播推荐列表偏移量
Ver int64 `json:"ver" bson:"ver"` //乐观锁
}