by Robin at 20240308

This commit is contained in:
Leufolium 2024-03-08 00:42:42 +08:00
parent a9161cfbd7
commit 7dec418bfa
10 changed files with 236 additions and 45 deletions

View File

@ -15,6 +15,12 @@ const (
DefaultPageSize = 10 //默认页长为10
)
// 推荐每次吞吐量
const (
StreamerRecommThroughput = 4
MomentRecommThroughput = 4
)
// 设备类型
const (
DevType_Android = 0
@ -70,7 +76,7 @@ const (
RedisStreamerPrefix = "streamer:" //streamer服务前缀
)
//const PackageRootPath = "C:/Users/PC/Desktop/wishpal_ironfan_service/service"
//const PackageRootPath = "C:/Users/PC/Desktop/service"
const PackageRootPath = "/app/wishpal-ironfan"
@ -78,7 +84,7 @@ const MainConfigPath = PackageRootPath + "/etc/mix/mix-test.yaml"
const ProductionConfigPath = PackageRootPath + "/etc/mix/mix-prod.yaml"
const LocalConfigPath = "/Users/erwin/wishpal/wishpal-ironfan/etc/mix/mix-local.yaml"
const LocalConfigPath = "C:/Users/PC/Desktop/service/etc/mix/mix-local.yaml"
const ReservedUserIdRegexesConfig = PackageRootPath + "/etc/mix/resource/reg_reserved_user_id_config.xml"

View File

@ -9,3 +9,8 @@ const (
TextAuditTaskUpdate_Pass = 0
TextAuditTaskUpdate_Rollback = 1
)
const (
Recomm_Down = 0
Recomm_Up = 1
)

View File

@ -150,6 +150,9 @@ var ErrCodeMsgMap = map[ErrCode]string{
ErrCodeMomentAuditTaskSrvFail: "动态审核任务表服务错误",
ErrCodeMomentAuditTaskNotExist: "动态审核任务表不存在",
ErrCodeUserVisitOffsetSrvFail: "用户游标表服务错误",
ErrCodeUserVisitOffsetNotExist: "用户游标表不存在",
}
const (
@ -358,6 +361,11 @@ const (
ErrCodeMomentAuditTaskSrvFail ErrCode = -29001 // 动态审核任务表服务错误
ErrCodeMomentAuditTaskNotExist ErrCode = -29002 // 动态审核任务表不存在
// UserVisitOffset: 30xxx
ErrCodeUserVisitOffsetSrvOk ErrCode = ErrCodeOk
ErrCodeUserVisitOffsetSrvFail ErrCode = -31001 // 用户游标表服务错误
ErrCodeUserVisitOffsetNotExist ErrCode = -31002 // 用户游标表不存在
// Media: 60xxx
ErrCodeMediaSrvOk ErrCode = ErrCodeOk
ErrCodeMediaSrvFail ErrCode = -60001 // 媒体服务错误

View File

@ -180,12 +180,11 @@ type ApiListStreamerWxIdResp struct {
// api 推荐
type ApiRecommListReq struct {
base.BaseRequest
Offset int `json:"offset"`
Limit int `json:"limit"`
OpType int64 `json:"op_type"`
}
type ApiRecommListData struct {
RecommList []int64 `json:"recomm_list"`
RecommList []*ApiListExtVO `json:"recomm_list"`
}
type ApiRecommListResp struct {

View File

@ -154,6 +154,9 @@ const (
DBMomentAuditTask = "moment_audit_task"
COLMomentAuditTask = "moment_audit_task"
DBUserVisitOffset = "user_visit_offset"
COLUserVisitOffset = "user_visit_offset"
)
// 商品表
@ -360,6 +363,11 @@ func (m *Mongo) getColMomentAuditTask() *qmgo.Collection {
return m.clientMix.Database(DBMomentAuditTask).Collection(COLMomentAuditTask)
}
// 用户访问偏移量表
func (m *Mongo) getColUserVisitOffset() *qmgo.Collection {
return m.clientMix.Database(DBUserVisitOffset).Collection(COLUserVisitOffset)
}
// 商品相关
func (m *Mongo) CreateProduct(ctx *gin.Context, product *dbstruct.Product) error {
col := m.getColProduct()
@ -3297,3 +3305,34 @@ func (m *Mongo) GetAppConfigListByKey(ctx *gin.Context, req *appconfigproto.OpLi
}
return appconfig, err
}
func (m *Mongo) CreateUserVisitOffset(ctx *gin.Context, uservisitoffset *dbstruct.UserVisitOffset) error {
col := m.getColUserVisitOffset()
_, err := col.InsertOne(ctx, uservisitoffset)
return err
}
func (m *Mongo) UpdateUserVisitOffset(ctx *gin.Context, uservisitoffset *dbstruct.UserVisitOffset) error {
col := m.getColUserVisitOffset()
filter := qmgo.M{
"_id": uservisitoffset.Id,
"ver": uservisitoffset.Ver,
}
update := qmgo.M{
"$set": qmgo.M{
"streamer_recomm_offset": uservisitoffset.StreamerRecommOffset,
},
}
err := col.UpdateOne(ctx, filter, update)
return err
}
func (m *Mongo) GetUserVisitOffset(ctx *gin.Context, mid int64) (*dbstruct.UserVisitOffset, error) {
uservisitoffset := &dbstruct.UserVisitOffset{}
col := m.getColUserVisitOffset()
query := qmgo.M{
"_id": mid,
}
err := col.Find(ctx, query).One(uservisitoffset)
return uservisitoffset, err
}

View File

@ -32,7 +32,6 @@ import (
"service/library/contentaudit/imageaudit"
"service/library/contentaudit/textaudit"
"service/library/logger"
"service/library/redis"
"go.mongodb.org/mongo-driver/mongo"
goproto "google.golang.org/protobuf/proto"
@ -1206,7 +1205,7 @@ func (s *Service) ApiGetStreamerWxId(ctx *gin.Context, req *streamerproto.ApiLis
}
// 推荐
func (s *Service) ApiGetStreamerRecommList(ctx *gin.Context, req *streamerproto.ApiRecommListReq) (recommlist []int64, ec errcode.ErrCode) {
func (s *Service) ApiGetStreamerRecommList(ctx *gin.Context, req *streamerproto.ApiRecommListReq) (recommStreamerList []*streamerproto.ApiListExtVO, ec errcode.ErrCode) {
ec = errcode.ErrCodeStreamerSrvOk
@ -1218,48 +1217,15 @@ func (s *Service) ApiGetStreamerRecommList(ctx *gin.Context, req *streamerproto.
return nil, errcode.ErrCodeApolloReadFail
}
//1.从redis中获取数据
err = redis.GetRedisClient().GetObject(consts.RedisStreamerPrefix+"recomm_list", &recommlist)
// 从redis中获取主播列表
recommlist, err := s.utilGetStreamerRecommList(ctx)
if err != nil {
logger.Error("Redis read failed : %v", err)
ec = errcode.ErrCodeStreamerRecommListRedisCacheInvalid
logger.Error("utilGetStreamerRecommList fail, err: %v", err)
ec = errcode.ErrCodeStreamerSrvFail
return
}
//2.若redis命中失败再从数据库查
if len(recommlist) == 0 {
logger.Error("Redis hit failed, reading recommendation list from mongo...")
// recommlist, _, err = _DefaultAccountRelation.OpIsFollowedCount(ctx)
// if err != nil {
// logger.Error("OpIsFollowedCount fail, err: %v", err)
// ec = errcode.ErrCodeAccountRelationSrvFail
// return
// }
list, err := _DefaultStreamer.OpList(ctx, &streamerproto.OpListReq{
Sort: "-fans",
})
if err != nil {
logger.Error("OpList fail, err: %v", err)
ec = errcode.ErrCodeAccountRelationSrvFail
return
}
recommlist = make([]int64, len(list))
for i, streamer := range list {
recommlist[i] = util.DerefInt64(streamer.Mid)
}
//若数据库命中成功则立即加载进redis
if len(recommlist) != 0 {
err := redis.GetRedisClient().Set(consts.RedisStreamerPrefix+"recomm_list", recommlist, 0)
if err != nil {
logger.Error("Redis cache fail, err: %v", err)
}
}
}
// 查询是否限制访客
visitorMid := req.BaseRequest.Mid
for _, restricted_visitor_mid := range cfg.RestrictedVisitorMids {
if restricted_visitor_mid == visitorMid { // 是限制访问的访客
@ -1278,6 +1244,45 @@ func (s *Service) ApiGetStreamerRecommList(ctx *gin.Context, req *streamerproto.
}
}
// 获取用户游标
recommListLength := int64(len(recommlist))
offset, err := s.utilGetUserVisitOffset(ctx, req.Mid, recommListLength)
if err != nil {
logger.Error("utilGetUserVisitOffset fail, err: %v", err)
ec = errcode.ErrCodeUserVisitOffsetSrvFail
return
}
// 根据用户游标查询得到结果
midList := make([]int64, 0)
for i := 0; i < consts.StreamerRecommThroughput; i++ {
index := (offset + int64(i)) % recommListLength
midList = append(midList, recommlist[index])
}
accountList, err := _DefaultAccount.OpListByMids(ctx, &accountproto.OpListByMidsReq{
Mids: midList,
})
if err != nil {
logger.Error("_DefaultAccount OpListByMids fail, err: %v", err)
ec = errcode.ErrCodeStreamerSrvFail
return
}
streamerExts := make([]streamerproto.StreamerExtVO, len(accountList))
for i := range streamerExts {
streamerExts[i] = &streamerproto.ApiListExtVO{}
}
ec = s.utilExtendAccountsIntoStreamerExts(ctx, accountList, streamerExts)
if ec != errcode.ErrCodeStreamerSrvOk {
logger.Error("Extend accountlist into streamer_exts fail, req: %v, err: %v", util.ToJson(req), err)
return
}
recommStreamerList = make([]*streamerproto.ApiListExtVO, len(streamerExts))
for i, streamerExt := range streamerExts {
vo, _ := streamerExt.(*streamerproto.ApiListExtVO)
recommStreamerList[i] = vo
}
return
}

View File

@ -0,0 +1,49 @@
package logic
import (
"service/app/mix/dao"
"service/dbstruct"
"service/library/logger"
"github.com/gin-gonic/gin"
)
type UserVisitOffset struct {
store *dao.Store
}
func NewUserVisitOffset(store *dao.Store) (a *UserVisitOffset) {
a = &UserVisitOffset{
store: store,
}
return
}
func (p *UserVisitOffset) OpCreate(ctx *gin.Context, uservisitoffset *dbstruct.UserVisitOffset) error {
err := p.store.CreateUserVisitOffset(ctx, uservisitoffset)
if err != nil {
logger.Error("CreateUserVisitOffset fail, err: %v", err)
return err
}
return nil
}
func (p *UserVisitOffset) OpUpdate(ctx *gin.Context, uservisitoffset *dbstruct.UserVisitOffset) error {
err := p.store.UpdateUserVisitOffset(ctx, uservisitoffset)
if err != nil {
logger.Error("UpdateUserVisitOffset fail, err: %v", err)
return err
}
return nil
}
func (p *UserVisitOffset) OpGetUserVisitOffset(ctx *gin.Context, mid int64) (*dbstruct.UserVisitOffset, error) {
uservisitoffset, err := p.store.GetUserVisitOffset(ctx, mid)
if err != nil {
logger.Error("GetUserVisitOffset fail, err: %v", err)
return nil, err
}
return uservisitoffset, nil
}

View File

@ -100,6 +100,7 @@ var (
_DefaultXxlJob *logic.XxlJob
_DefaultAppConfig *logic.AppConfig
_DefaultMomentAuditTask *logic.MomentAuditTask
_DefaultUserVisitOffset *logic.UserVisitOffset
)
type Service struct {
@ -176,6 +177,7 @@ func (s *Service) Init(c any) (err error) {
_DefaultXxlJob = logic.NewXxlJob(store, cfg.XxlJob)
_DefaultAppConfig = logic.NewAppConfig(store)
_DefaultMomentAuditTask = logic.NewMomentAuditTask(store)
_DefaultUserVisitOffset = logic.NewUserVisitOffset(store)
return
}

View File

@ -21,6 +21,7 @@ import (
"service/bizcommon/util"
"service/dbstruct"
"service/library/logger"
"service/library/redis"
"strings"
"time"
@ -626,3 +627,73 @@ func (s *Service) utilStringifyContactCustomerServices(ctx *gin.Context, contact
msg = msgBuilder.String()
return
}
func (s *Service) utilGetStreamerRecommList(ctx *gin.Context) (recommlist []int64, err error) {
// 1.从redis中获取数据
err = redis.GetRedisClient().GetObject(consts.RedisStreamerPrefix+"recomm_list", &recommlist)
if err != nil {
logger.Error("Redis read failed : %v", err)
return
}
// 2.若redis命中失败再从数据库查
if len(recommlist) == 0 {
logger.Error("Redis hit failed, reading recommendation list from mongo...")
list, err := _DefaultStreamer.OpList(ctx, &streamerproto.OpListReq{
Sort: "-fans",
})
if err != nil {
logger.Error("OpList fail, err: %v", err)
return nil, err
}
recommlist = make([]int64, len(list))
for i, streamer := range list {
recommlist[i] = util.DerefInt64(streamer.Mid)
}
//若数据库命中成功则立即加载进redis
if len(recommlist) != 0 {
err := redis.GetRedisClient().Set(consts.RedisStreamerPrefix+"recomm_list", recommlist, 0)
if err != nil {
logger.Error("Redis cache fail, err: %v", err)
}
}
}
return
}
func (s *Service) utilGetUserVisitOffset(ctx *gin.Context, mid int64, recommlistLength int64) (offset int64, err error) {
offset = int64(0)
uservisitoffset, err := _DefaultUserVisitOffset.OpGetUserVisitOffset(ctx, mid)
if err != nil {
logger.Error("OpGetUserVisitOffset fail, err: %v", err)
return
}
if uservisitoffset == nil {
err = _DefaultUserVisitOffset.OpCreate(ctx, &dbstruct.UserVisitOffset{
Id: mid,
StreamerRecommOffset: 0,
Ver: 0,
})
if err != nil {
logger.Error("_DefaultUserVisitOffset OpCreate fail, err: %v", err)
return
}
} else {
offset = uservisitoffset.StreamerRecommOffset
nowoffset := (offset + consts.StreamerRecommThroughput) % recommlistLength
err = _DefaultUserVisitOffset.OpUpdate(ctx, &dbstruct.UserVisitOffset{
Id: uservisitoffset.Id,
StreamerRecommOffset: nowoffset,
Ver: uservisitoffset.Ver,
})
if err != nil {
logger.Error("_DefaultUserVisitOffset OpUpdate fail, err: %v", err)
return
}
}
return
}

View File

@ -0,0 +1,7 @@
package dbstruct
type UserVisitOffset struct {
Id int64 `json:"id" bson:"_id"` //id,用户的mid
StreamerRecommOffset int64 `json:"streamer_recomm_offset" bson:"streamer_recomm_offset"` //主播推荐列表偏移量
Ver int64 `json:"ver" bson:"ver"` //乐观锁
}